kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Consumer not able to create message streams
Date Fri, 06 Dec 2013 16:10:47 GMT
I don't see createMessageStreams in the thread dump though. Are you sure
it's stuck there?

Thanks,

Jun


On Fri, Dec 6, 2013 at 1:58 AM, Tarang Dawer <tarang.dawer@gmail.com> wrote:

> Hi Jun
> please find the consumer thread dump in my previous reply.
>
>
> On Fri, Dec 6, 2013 at 3:27 PM, Tarang Dawer <tarang.dawer@gmail.com>
> wrote:
>
> > Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.8-b03 mixed mode):
> >
> > "DestroyJavaVM" prio=10 tid=0x00007feb18006800 nid=0xcc2 waiting on
> > condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> >
> "consumerGroup675437781_impetus-d898-1386323237902-426eefe2_watcher_executor"
> > prio=10 tid=0x00007feb18059000 nid=0xcd9 waiting on condition
> > [0x00007feb143cd000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbe80f8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2116)
> >     at
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:319)
> >
> > "main-EventThread" daemon prio=10 tid=0x00007feb18321800 nid=0xcd8
> waiting
> > on condition [0x00007feb144ce000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbf00a0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >     at
> >
> org.apache.zookeeper.ClientCnxn$EventThread.run(zookeeper:ClientCnxn.java):414)
> >
> > "main-SendThread" daemon prio=10 tid=0x00007feb18145800 nid=0xcd7
> runnable
> > [0x00007feb145cf000]
> >    java.lang.Thread.State: RUNNABLE
> >     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
> >     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> >     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> >     - locked <0x00000000ffbf8118> (a sun.nio.ch.Util$2)
> >     - locked <0x00000000ffbf8128> (a
> java.util.Collections$UnmodifiableSet)
> >     - locked <0x00000000ffbf80d0> (a sun.nio.ch.EPollSelectorImpl)
> >     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> >     at
> >
> org.apache.zookeeper.ClientCnxn$SendThread.run(zookeeper:ClientCnxn.java):921)
> >
> > "ZkClient-EventThread-15-192.168.145.144:2181" daemon prio=10
> > tid=0x00007feb181d6800 nid=0xcd6 waiting on condition
> [0x00007feb147d8000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbe81c0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >     at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:68)
> >
> > "ProducerSendThread-" prio=10 tid=0x00007feb182d4000 nid=0xcd5 waiting on
> > condition [0x00007feb148d9000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbe82f0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
> >     at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
> >     at scala.collection.immutable.Stream$.continually(Stream.scala:598)
> >     at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65)
> >     at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >
> > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007feb1811b000
> > nid=0xcd4 waiting on condition [0x00007feb149da000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbf81a8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> >     at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
> >     at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
> >     at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> >     at java.lang.Thread.run(Thread.java:662)
> >
> > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007feb181a8000
> > nid=0xcd3 waiting on condition [0x00007feb14adb000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x00000000ffbf81a8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> >     at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
> >     at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
> >     at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> >     at java.lang.Thread.run(Thread.java:662)
> >
> > "Abandoned connection cleanup thread" daemon prio=10
> > tid=0x00007feb18151000 nid=0xcd2 in Object.wait() [0x00007feb14ce3000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x00000000ffbf8330> (a
> java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
> >     - locked <0x00000000ffbf8330> (a java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
> >     at
> > com.mysql.jdbc.NonRegisteringDriver$1.run(NonRegisteringDriver.java:93)
> >
> > "Low Memory Detector" daemon prio=10 tid=0x00007feb18090800 nid=0xccd
> > runnable [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "C2 CompilerThread1" daemon prio=10 tid=0x00007feb1808e000 nid=0xccc
> > waiting on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "C2 CompilerThread0" daemon prio=10 tid=0x00007feb1808b800 nid=0xccb
> > waiting on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "Signal Dispatcher" daemon prio=10 tid=0x00007feb18089000 nid=0xcca
> > waiting on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "Finalizer" daemon prio=10 tid=0x00007feb1806d000 nid=0xcc9 in
> > Object.wait() [0x00007feb1d407000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x00000000ffbe8910> (a
> java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
> >     - locked <0x00000000ffbe8910> (a java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
> >     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
> >
> > "Reference Handler" daemon prio=10 tid=0x00007feb1806b000 nid=0xcc8 in
> > Object.wait() [0x00007feb1d508000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x00000000ffbe8aa0> (a java.lang.ref.Reference$Lock)
> >     at java.lang.Object.wait(Object.java:485)
> >     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
> >     - locked <0x00000000ffbe8aa0> (a java.lang.ref.Reference$Lock)
> >
> > "VM Thread" prio=10 tid=0x00007feb18064800 nid=0xcc7 runnable
> >
> > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007feb18019800 nid=0xcc3
> > runnable
> >
> > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007feb1801b800 nid=0xcc4
> > runnable
> >
> > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007feb1801d800 nid=0xcc5
> > runnable
> >
> > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007feb1801f000 nid=0xcc6
> > runnable
> >
> > "VM Periodic Task Thread" prio=10 tid=0x00007feb180a3000 nid=0xcce
> waiting
> > on condition
> >
> > JNI global references: 1310
> >
> > Heap
> >  PSYoungGen      total 29888K, used 26071K [0x00000000fdeb0000,
> > 0x0000000100000000, 0x0000000100000000)
> >   eden space 25664K, 92% used
> > [0x00000000fdeb0000,0x00000000ff5f5f60,0x00000000ff7c0000)
> >   from space 4224K, 53% used
> > [0x00000000ffbe0000,0x00000000ffe10050,0x0000000100000000)
> >   to   space 4224K, 0% used
> > [0x00000000ff7c0000,0x00000000ff7c0000,0x00000000ffbe0000)
> >  PSOldGen        total 68288K, used 0K [0x00000000f9c00000,
> > 0x00000000fdeb0000, 0x00000000fdeb0000)
> >   object space 68288K, 0% used
> > [0x00000000f9c00000,0x00000000f9c00000,0x00000000fdeb0000)
> >  PSPermGen       total 29888K, used 29878K [0x00000000f4a00000,
> > 0x00000000f6730000, 0x00000000f9c00000)
> >   object space 29888K, 99% used
> > [0x00000000f4a00000,0x00000000f672dad8,0x00000000f6730000)
> >
> >
> >
> > On Wed, Dec 4, 2013 at 11:11 PM, Jun Rao <junrao@gmail.com> wrote:
> >
> >> Could you take a thread dump and see where the createMessageStreams is
> >> stuck?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Dec 4, 2013 at 4:10 AM, Tarang Dawer <tarang.dawer@gmail.com>
> >> wrote:
> >>
> >> > Hi All
> >> >
> >> > I am using Kafka 0.8 , with inbuilt zookeeper.
> >> > I am runnings consumers in a jar.
> >> >
> >> >
> >> >
> >> > Configuration properties for creating consumerConfig : -
> >> >
> >> > zookeeper.connect=IP
> >> > group.id=consumerGroup
> >> > fetch.message.max.bytes=1000000000
> >> > zookeeper.session.timeout.ms=60000
> >> > auto.offset.reset=smallest
> >> > zookeeper.sync.time.ms=200
> >> > auto.commit.enable=false
> >> >
> >> > While running it, some times the consumer gets stuck on the message
> >> stream
> >> > creation call i.e
> >> >
> >> >
> >> > ConsumerConfig consumerConfig = new ConsumerConfig(this.props);
> >> >
> >> >             ConsumerConnector consumerConnector = Consumer
> >> >                     .createJavaConsumerConnector(consumerConfig);
> >> >
> >> >
> >> > *Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamMap
=
> >> > consumerConnector                    .createMessageStreams(topicMap)*;
> >> >
> >> > I have tested this thing on a number of partition settings , i.e 1,2,5
> >> etc.
> >> >
> >> > Whenever the consumer gets stuck, i have to kill the jar, and start it
> >> > again. This thing happens only a select couple of times , i.e 2 or 3
> >> times
> >> > out of 10, and for rest of the times, the consumer starts fine.
> >> >
> >> > Is there some property configuration that i may have been missing,
> >> that's
> >> > causing this inconsistent behaviour ?
> >> >
> >> >
> >> > Any guidance would be of great help.
> >> >
> >> > Thanks
> >> > Tarang Dawer
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message