kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Reuse ConsumerConnector
Date Wed, 10 Dec 2014 20:19:41 GMT
Hey Guys,
I have a user case that my thread reads from different kafka topic
periodically through a timer. The way I am reading from kafka in the timer
callback is the following:

try {

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector

  .createMessageStreams(topicCountMap);

 List<KafkaStream<byte[], byte[]>> streamList = consumerMap

  .get(kafkaTopic);


  int index = 0;

 for (KafkaStream<byte[], byte[]> stream : streamList) {

  this.subscriberExecutor.execute(new Subscriber(stream,

   namespace, sendTopic, producerDataChannel, index));

  index++;

 }

 } catch (Exception ex) {

 logger.error("failed to create message stream", ex);

 }
As you can see, I am reusing consumerConnector, and doesn't call shutdown.
My question is,
should I create a new consumerConnector each time in my timer callback, and
shutdown the consumer when seeing ConsumerTimeoutException? Or reusing the
same ConsumerConnector every time is  ok? Is there any performance impact
for creating ConsumerConnector repeatedly?

When using the same consumer to read from the same topic twice, I am seeing:
I wrote this conflicted ephemeral node
[{"version":1,"subscription":{"My_topic_1":3},"pattern":"static","timestamp":"1418241613803"}]
at /consumers/my_subscriber/my_subscriber_dev-trgt02-1418241313609-bbc26e95
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry"

What does this message actually mean?

Thanks,
Chen

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message