kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vincent Rischmann <vinc...@rischmann.fr>
Subject Problem when using ConsumerIterator
Date Wed, 11 Dec 2013 09:34:53 GMT


I am writing a simple program in Java using the Kafka 0.8.0 jar compiled 
with Scala 2.10.
I have designed my program with a singleton  class which holds a map of 
(consumer group, ConsumerConnector) and a map of (topic, Producer).
This singleton class provides two methods, send(topic, object) and 
receive(topic, consumerGroup, Class<?> klass).

The receive() method retrieves a ConsumerConnector from the map and then 
calls createMessageStreams() to get a new ConsumerIterator.

The program launches a producer thread and a consumer thread, the 
producer produces 100 messages, the consumer consumes 100 messages and 
then stops.
The program keeps running, and reruns the producer and consumer threads 
some time after (between 5 and 10 minutes).

The first run works great, but my problem happens when it runs the 
second time: the producer works fine again, but the consumer DOES NOT 
consume anything, and I see the following in my logs;

09:19:11,689 INFO  ~ 
[simple_pojo_consumer_group_fenrir-1386749817005-7c45b826], begin 
registering consumer 
simple_pojo_consumer_group_fenrir-1386749817005-7c45b826 in ZK
09:19:11,709 INFO  ~ conflict in 

data: { "pattern":"static", "subscription":{ "simple_pojo": 1 }, 
"timestamp":"1386749951689", "version":1 } stored data: { 
"pattern":"static", "subscription":{ "simple_pojo": 1 }, 
"timestamp":"1386749836099", "version":1 }
09:19:11,712 INFO  ~ I wrote this conflicted ephemeral node [{ 
"pattern":"static", "subscription":{ "simple_pojo": 1 }, 
"timestamp":"1386749951689", "version":1 }] at 

a while back in a different session, hence I will backoff for this node 
to be deleted by Zookeeper and retry

This happens every 5 second.
The Zookeeper server logs contains the following, each time a conflict 
is logged in the application:

[2013-12-11 09:19:11,622] INFO Got user-level KeeperException when 
processing sessionid:0x142e0b5d3d80001 type:create cxid:0x1b 
zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error 

Error:KeeperErrorCode = NodeExists for 


I tried searching for solutions, but did not found anything which could 
help me. I'm thinking I'm not using the library correctly, but can't see 
Is it not ok to keep ConsumerConnector objects in a map and reuses them 
with createMessageStreams() ?

If it is ok, do you have any idea what could be the problem here ?

Thank you,

View raw message