kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nihit Purwar <npur...@sprinklr.com>
Subject Re: Kafka consumer not consuming events
Date Wed, 10 Jul 2013 06:12:07 GMT
Hi Jun,

Please see my comments inline again :)

On 10-Jul-2013, at 9:13 AM, Jun Rao <junrao@gmail.com> wrote:

> This indicates our in-memory queue is empty. So the consumer thread is
> blocked.

What should we do about this.
As I mentioned in the previous mail, events are there to be consumed.
Killing one consumer makes the other consumer consume events again.


> What about the Kafka fetcher threads? Are they blocked on anything?

One of the fetcher threads is blocked on putting to a queue, the other is sleeping.
Please look below:

"FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on condition [0x00007fcb833eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006809e8000> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)

"FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on condition [0x00007fcb836ee000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)

> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npurwar@sprinklr.com> wrote:
> 
>> Hello Jun,
>> 
>> Please see my comments inline.
>> 
>> On 09-Jul-2013, at 8:32 PM, Jun Rao <junrao@gmail.com> wrote:
>> 
>>> I assume that each consumer instance consumes all 15 topics.
>> No, we kept dedicated consumer listening to the topic in question.
>> We did this because this queue processes huge amounts of data.
>> 
>> 
>>> Are all your
>>> consumer threads alive? If one of your thread dies, it will eventually
>>> block the consumption in other threads.
>> 
>> Yes. We can see all the threads in the thread dump.
>> We have ensured that the threads do not die due to an Exception.
>> 
>> Please look at the stack trace below. We see all the threads waiting like
>> this:
>> 
>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
>> condition [0x00007efedae6d000]
>>   java.lang.Thread.State: WAITING (parking)
>>        at sun.misc.Unsafe.park(Native Method)
>>        - parking to wait for  <0x0000000640248618> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>        at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>        at
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>        at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>        at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>        at java.lang.Thread.run(Thread.java:662)
>> 
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npurwar@sprinklr.com>
>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>> 
>>>> Our cluster configuration:
>>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
>>>> instance running as well.
>>>> We have 15 topics defined. We are trying to use them as queue (JMS like)
>>>> by defining the same group across different kafka consumers.
>>>> On the consumer side, we are using High Level Consumer.
>>>> 
>>>> However we are seeing a weird behaviour.
>>>> One of our heavily used queue (event_queue) has 2 dedicated consumers
>>>> listening to that queue only.
>>>> This queue is defined with 150 partitions on each broker & the number
of
>>>> streams defined on the 2 dedicated consumers is 150.
>>>> After a while we see that most the consumer threads keep waiting for
>>>> events and the lag keeps growing.
>>>> If we kill one of the dedicated consumers, then the other consumer
>> starts
>>>> getting messaging in a hurry.
>>>> 
>>>> Consumer had no Full GCs.
>>>> 
>>>> How we measure lag?
>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>> event_queue --zkconnect
>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>> event_queue
>>>> 
>>>> Around the time, the events stopped coming to the new consumer.. this
>> was
>>>> printed on the logs:
>>>> 
>>>> [INFO] zookeeper state changed (Disconnected)
>>>> [INFO] zookeeper state changed (Disconnected)
>>>> [INFO] zookeeper state changed (SyncConnected)
>>>> [INFO] zookeeper state changed (SyncConnected)
>>>> 
>>>> Config Overidden:
>>>> Consumer:
>>>> fetch.size=3MB
>>>> autooffset.reset=largest
>>>> autocommit.interval.ms=500
>>>> Producer:
>>>> maxMessageSize=3MB
>>>> 
>>>> Please let us know if we are doing some wrong OR facing some known issue
>>>> here?
>>>> 
>>>> Thanks,
>>>> Nihit
>> 
>> 


Mime
View raw message