kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Problem when using ConsumerIterator
Date Wed, 11 Dec 2013 16:09:33 GMT
Yes, this seems to be a bug in javaapi, could you file a jira?

Normally, a consumer will create a stream once and keep iterating on the
stream. The connection to ZK happens when the consumer connector is
created. The connection to the brokers happens after the creation of the
stream.

Thanks,

Jun


On Wed, Dec 11, 2013 at 5:42 AM, Vincent Rischmann <vincent@rischmann.fr>wrote:

> Le 11/12/2013 10:34, Vincent Rischmann a écrit :
>
>
>> Hello,
>>
>> I am writing a simple program in Java using the Kafka 0.8.0 jar compiled
>> with Scala 2.10.
>> I have designed my program with a singleton  class which holds a map of
>> (consumer group, ConsumerConnector) and a map of (topic, Producer).
>> This singleton class provides two methods, send(topic, object) and
>> receive(topic, consumerGroup, Class<?> klass).
>>
>> The receive() method retrieves a ConsumerConnector from the map and then
>> calls createMessageStreams() to get a new ConsumerIterator.
>>
>> The program launches a producer thread and a consumer thread, the
>> producer produces 100 messages, the consumer consumes 100 messages and then
>> stops.
>> The program keeps running, and reruns the producer and consumer threads
>> some time after (between 5 and 10 minutes).
>>
>> The first run works great, but my problem happens when it runs the second
>> time: the producer works fine again, but the consumer DOES NOT consume
>> anything, and I see the following in my logs;
>>
>> 09:19:11,689 INFO  ~ [simple_pojo_consumer_group_
>> fenrir-1386749817005-7c45b826], begin registering consumer
>> simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 in ZK
>> 09:19:11,709 INFO  ~ conflict in /consumers/simple_pojo_
>> consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
>> data: { "pattern":"static", "subscription":{ "simple_pojo": 1 },
>> "timestamp":"1386749951689", "version":1 } stored data: {
>> "pattern":"static", "subscription":{ "simple_pojo": 1 },
>> "timestamp":"1386749836099", "version":1 }
>> 09:19:11,712 INFO  ~ I wrote this conflicted ephemeral node [{
>> "pattern":"static", "subscription":{ "simple_pojo": 1 },
>> "timestamp":"1386749951689", "version":1 }] at /consumers/simple_pojo_
>> consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
>> a while back in a different session, hence I will backoff for this node to
>> be deleted by Zookeeper and retry
>>
>> This happens every 5 second.
>> The Zookeeper server logs contains the following, each time a conflict is
>> logged in the application:
>>
>> [2013-12-11 09:19:11,622] INFO Got user-level KeeperException when
>> processing sessionid:0x142e0b5d3d80001 type:create cxid:0x1b
>> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error
>> Path:/consumers/simple_pojo_consumer_group/ids/simple_
>> pojo_consumer_group_fenrir-1386749817005-7c45b826 Error:KeeperErrorCode
>> = NodeExists for /consumers/simple_pojo_consumer_group/ids/simple_
>> pojo_consumer_group_fenrir-1386749817005-7c45b826
>> (org.apache.zookeeper.server.PrepRequestProcessor)
>>
>> I tried searching for solutions, but did not found anything which could
>> help me. I'm thinking I'm not using the library correctly, but can't see
>> how.
>> Is it not ok to keep ConsumerConnector objects in a map and reuses them
>> with createMessageStreams() ?
>>
>> If it is ok, do you have any idea what could be the problem here ?
>>
>> Thank you,
>> Vincent.
>>
>
> Hello,
>
> I'm reading the source code right now, and there's something I'm not clear
> with:
>
> In kafka.consumer.javaapi.ZookeeperConsumerConnector.scala, the
> createMessageStreams() directly calls underlying.consume() (line 80)
> In kafka.consumer.ZookeeperConsumerConnector.scala, the
> createMessageStreams() throws an exception if it has been called more than
> once (line 133).
>
> I'm guessing the javaapi should do the same, throwing an exception if it
> is called more than once ?
> If it is the case, then there is a bug in the method
> createMessageStreals() of javaapi.ZookeeperConsumerConnector.scala
> Also, if it is the case, do I need to pre-create all streams ? How costly
> is it to create a ConsumerConnector, does it reopen a connection to the ZK
> server or the Kafka broker ?
>
> Thanks.
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message