kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vincent Rischmann <vinc...@rischmann.fr>
Subject Re: Problem when using ConsumerIterator
Date Wed, 11 Dec 2013 16:34:48 GMT
Le 11/12/2013 17:09, Jun Rao a écrit :
> Yes, this seems to be a bug in javaapi, could you file a jira?
>
> Normally, a consumer will create a stream once and keep iterating on the
> stream. The connection to ZK happens when the consumer connector is
> created. The connection to the brokers happens after the creation of the
> stream.
>
> Thanks,
>
> Jun
>
>
> On Wed, Dec 11, 2013 at 5:42 AM, Vincent Rischmann <vincent@rischmann.fr>wrote:
>
>> Le 11/12/2013 10:34, Vincent Rischmann a écrit :
>>
>>
>>> Hello,
>>>
>>> I am writing a simple program in Java using the Kafka 0.8.0 jar compiled
>>> with Scala 2.10.
>>> I have designed my program with a singleton  class which holds a map of
>>> (consumer group, ConsumerConnector) and a map of (topic, Producer).
>>> This singleton class provides two methods, send(topic, object) and
>>> receive(topic, consumerGroup, Class<?> klass).
>>>
>>> The receive() method retrieves a ConsumerConnector from the map and then
>>> calls createMessageStreams() to get a new ConsumerIterator.
>>>
>>> The program launches a producer thread and a consumer thread, the
>>> producer produces 100 messages, the consumer consumes 100 messages and then
>>> stops.
>>> The program keeps running, and reruns the producer and consumer threads
>>> some time after (between 5 and 10 minutes).
>>>
>>> The first run works great, but my problem happens when it runs the second
>>> time: the producer works fine again, but the consumer DOES NOT consume
>>> anything, and I see the following in my logs;
>>>
>>> 09:19:11,689 INFO  ~ [simple_pojo_consumer_group_
>>> fenrir-1386749817005-7c45b826], begin registering consumer
>>> simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 in ZK
>>> 09:19:11,709 INFO  ~ conflict in /consumers/simple_pojo_
>>> consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
>>> data: { "pattern":"static", "subscription":{ "simple_pojo": 1 },
>>> "timestamp":"1386749951689", "version":1 } stored data: {
>>> "pattern":"static", "subscription":{ "simple_pojo": 1 },
>>> "timestamp":"1386749836099", "version":1 }
>>> 09:19:11,712 INFO  ~ I wrote this conflicted ephemeral node [{
>>> "pattern":"static", "subscription":{ "simple_pojo": 1 },
>>> "timestamp":"1386749951689", "version":1 }] at /consumers/simple_pojo_
>>> consumer_group/ids/simple_pojo_consumer_group_fenrir-1386749817005-7c45b826
>>> a while back in a different session, hence I will backoff for this node to
>>> be deleted by Zookeeper and retry
>>>
>>> This happens every 5 second.
>>> The Zookeeper server logs contains the following, each time a conflict is
>>> logged in the application:
>>>
>>> [2013-12-11 09:19:11,622] INFO Got user-level KeeperException when
>>> processing sessionid:0x142e0b5d3d80001 type:create cxid:0x1b
>>> zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error
>>> Path:/consumers/simple_pojo_consumer_group/ids/simple_
>>> pojo_consumer_group_fenrir-1386749817005-7c45b826 Error:KeeperErrorCode
>>> = NodeExists for /consumers/simple_pojo_consumer_group/ids/simple_
>>> pojo_consumer_group_fenrir-1386749817005-7c45b826
>>> (org.apache.zookeeper.server.PrepRequestProcessor)
>>>
>>> I tried searching for solutions, but did not found anything which could
>>> help me. I'm thinking I'm not using the library correctly, but can't see
>>> how.
>>> Is it not ok to keep ConsumerConnector objects in a map and reuses them
>>> with createMessageStreams() ?
>>>
>>> If it is ok, do you have any idea what could be the problem here ?
>>>
>>> Thank you,
>>> Vincent.
>>>
>> Hello,
>>
>> I'm reading the source code right now, and there's something I'm not clear
>> with:
>>
>> In kafka.consumer.javaapi.ZookeeperConsumerConnector.scala, the
>> createMessageStreams() directly calls underlying.consume() (line 80)
>> In kafka.consumer.ZookeeperConsumerConnector.scala, the
>> createMessageStreams() throws an exception if it has been called more than
>> once (line 133).
>>
>> I'm guessing the javaapi should do the same, throwing an exception if it
>> is called more than once ?
>> If it is the case, then there is a bug in the method
>> createMessageStreals() of javaapi.ZookeeperConsumerConnector.scala
>> Also, if it is the case, do I need to pre-create all streams ? How costly
>> is it to create a ConsumerConnector, does it reopen a connection to the ZK
>> server or the Kafka broker ?
>>
>> Thanks.
>>
>>
Hi,

thanks Jun, I'm going to file a JIRA tonight.

Due to this, I'm going to have to redesign how we use consumers, so if I 
still have this problem when I'm done redesigning I will come back.

Mime
View raw message