kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Multiple Processes Consuming from Same GroupID
Date Thu, 12 Sep 2013 14:40:42 GMT
That's still the consumer log, not the broker log.

Thanks,

Jun


On Wed, Sep 11, 2013 at 10:24 PM, prashant amar <amasindhu@gmail.com> wrote:

> From the broker log:
>
>
> INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>
>
> On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > This means the broker somehow closed the socket connection. Anything in
> the
> > broker log around the same time?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Sep 11, 2013 at 6:07 PM, prashant amar <amasindhu@gmail.com>
> > wrote:
> >
> > > Also noticed another issue
> > >
> > > Specified below is the current configuration
> > >
> > > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> > > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> > >
> > > Notice that I have used the same naming convention on the consumer
> group
> > > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
> > > topics.
> > >
> > > On calling the *ConsumerOffsetChecker* API, I am receiving a
> > > ClosedChannelException
> > >
> > > (Check Trace Below)
> > >
> > > Is there any namespace collision occurring here ? This issue is
> > > reproducible with the following setup above
> > >
> > >
> > > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
> > > --zkconnect localhost:2181*
> > >
> > >
> > > 2013-09-12 01:01:59,701] INFO Initiating client connection,
> > > connectString=localhost:2181 sessionTimeout=30000
> > > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-09-12 01:01:59,724] INFO Opening socket connection to server
> > > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,732] INFO Socket connection established to
> > localhost/
> > > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,741] INFO Session establishment complete on server
> > > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
> > > timeout
> > > = 30000 (org.apache.zookeeper.ClientCnxn)
> > > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > Group           Topic                          Pid Offset
> >  logSize
> > >         Lag             Owner
> > > gr2             pe1                            0   129985
>  130625
> > >        640             none
> > > gr2             pe1                            1   0               0
> > >         0               none
> > > gr2             pe2                            0   130493
>  130493
> > >        0               gr2_ip-XXXXXXXXXX-6c6f5d94-0
> > > [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
> > >  (kafka.consumer.SimpleConsumer)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > >  at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > >  at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > >  at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > gr2             pe2                            1   0               0
> > >         0               gr2_ip-XXXXXXX-6c6f5d94-1
> > > [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-09-12 01:02:00,526] INFO EventThread shut down
> > > (org.apache.zookeeper.ClientCnxn)
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede <
> neha.narkhede@gmail.com>
> > >  wrote:
> > >
> > > > I think you are hitting this -
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F
> > > >
> > > > Let us know if we can improve the documentation to make it clearer.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Wed, Sep 11, 2013 at 5:28 PM, prashant amar <amasindhu@gmail.com>
> > > > wrote:
> > > >
> > > > > Also attempted another pattern where
> > > > >
> > > > > 1. Created a topic T with 'n' partitions.
> > > > > 2. Created a consumer group process with 'n + 1' threads
> subscribing
> > > from
> > > > > topic 'T' with a groupID 'y'
> > > > > 3. Added another consumer group process with 'n + 1' threads
> > > subscribing
> > > > > from same topic 'T' with same groupID 'z'
> > > > > (Note that 2 and 3 subscribe from same topic but different groups)
> > > > >
> > > > > Can a single topic with multiple partitions abetted with multiple
> > > > consumer
> > > > > groups increase parallelism is consumption?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Sep 11, 2013 at 4:48 PM, prashant amar <
> amasindhu@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > A Design Question that needs verification:
> > > > > >
> > > > > > 1. Created a topic T with 'n' partitions.
> > > > > > 2. Created a consumer group process with 'n + 1' threads
> > subscribing
> > > > from
> > > > > > topic 'T' with a groupID 'y'
> > > > > > 3. Added another consumer group process with 'n + 1' threads
> > > > subscribing
> > > > > > from same topic 'T' with same groupID 'y'
> > > > > >
> > > > > > On doing so, I noticed that the previous consumer group stops
> > > consuming
> > > > > > and the new consumer beings to consume
> > > > > >
> > > > > > I was attempting to model on demand parallelization in an event
> > where
> > > > an
> > > > > > consumer group cannot keep up with the events produced. Rather
> than
> > > > > > increase the threadpool capacity in the same process, it would
> make
> > > > sense
> > > > > > to distribute the load across multiple processes.
> > > > > >
> > > > > > Advice please?
> > > > > >
> > > > > > Regards
> > > > > > Amardeep
> > > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message