kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajasekar Elango <rela...@salesforce.com>
Subject Re: Mirrormaker stopped consuming
Date Tue, 10 Sep 2013 22:21:50 GMT
We have about 4 topics that are actively consumed each with 8 partitions. I
am able to see all topics/partitions and its leaders via list topics
command. But now the mirrormaker is restarted to restore the problem. I
will double check leaders for partitions when the problem happen again.


On Tue, Sep 10, 2013 at 6:12 PM, Sriram Subramanian <
srsubramanian@linkedin.com> wrote:

> Could you provide the topic/partition and also a dump of the zookeeper
> state? One possibility is if we never elected a leader for a topic
> partition.
>
> On 9/10/13 3:07 PM, "Rajasekar Elango" <relango@salesforce.com> wrote:
>
> >I did more investigation to confirm if this could be really due to jira
> >issue https://issues.apache.org/jira/browse/KAFKA-937 . But all thread
> >dumps I got did not had any dead locks for even blocked threads. All
> >threads are pretty much in RUNNABLE state. I consistently see
> >"kafka.common.NotLeaderForPartitionException"
> >in mirrormaker logs every time this happens. Could
> >NotLeaderForPartitionException
> >cause consumer threads to stop consuming..? What are the debugging steps
> >we
> >can try to identify root cause of this problem?
> >
> >Thanks,
> >Raja.
> >
> >
> >On Thu, Sep 5, 2013 at 7:33 PM, Rajasekar Elango
> ><relango@salesforce.com>wrote:
> >
> >> This jira seem to be resolved very recently. We are using snapshot of
> >>0.8
> >> release from beginning of june.
> >>
> >> Thanks,
> >> Raja.
> >>
> >>
> >> On Thu, Sep 5, 2013 at 1:49 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
> >>
> >>> Your threaddump looks similar to the deadlock in
> >>> https://issues.apache.org/jira/browse/KAFKA-937 I thought that fix was
> >>> included in the beta release - which version of Kafka are you using?
> >>>
> >>>
> >>> On Thu, Sep 5, 2013 at 8:35 AM, Rajasekar Elango
> >>><relango@salesforce.com>
> >>> wrote:
> >>> > Thanks , I am working on tuning GC options. This happened again today
> >>> and
> >>> > mirrormaker stopped consuming. This time the last exception was:
> >>> >
> >>> > 2013-09-05 07:01:52,931
> >>> >
> >>>
> >>>[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-2]
> >>> > WARN  (kafka.consumer.ConsumerFetcherThread)  -
> >>> >
> >>>
> >>>[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-2],
> >>> > error for partition [jmx,5] to broker
> >>> > 2kafka.common.NotLeaderForPartitionException
> >>> >         at
> >>> sun.reflect.GeneratedConstructorAccessor18.newInstance(Unknown Source)
> >>> >         at
> >>>
> >>>sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons
> >>>tructorAccessorImpl.java:27)
> >>> >         at
> >>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> >>> >         at java.lang.Class.newInstance0(Class.java:355)
> >>> >         at java.lang.Class.newInstance(Class.java:308)
> >>> >         at
> >>> kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonf
> >>>un$apply$5.apply(AbstractFetcherThread.scala:158)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonf
> >>>un$apply$5.apply(AbstractFetcherThread.scala:158)
> >>> >         at kafka.utils.Logging$class.warn(Logging.scala:88)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply
> >>>AbstractFetcherThread.scala:157)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread$$anonfun$processFetchReuest$4.apply(
> >>>AbstractFetcherThread.scala:113)
> >>> >         at
> >>> scala.collectionimmutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> >>> >         at
> >>>
> >>>scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347
> >>>)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:113)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(Shutdownablehread.scala:51)
> >>> >
> >>> >
> >>> >
> >>> > I also captured thread dump and pasting it below. Can you confirm if
> >>> > this could again due to Full GC or something else that need to be
> >>> > looked at.
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > ull thread dump Java HotSpot(TM) 64-Bit Server VM (17.0-b16 mixed
> >>>mode):
> >>> >
> >>> > "Attach Listener" daemon prio=10 tid=0x00007f539c001800 nid=0x6b49
> >>> > waiting on condition [0x0000000000000000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >
> >>> >
> >>>
> >>>"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-3"
> >>> > prio=10 tid=0x00007f52f800c000 nid=0x5e03 waiting on condition
> >>> > [0x00007f53b46cf000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14f5a0> (a
> >>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterr
> >>>upt(AbstractQueuedSynchronizer.java:811)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Abst
> >>>ractueuedSynchronizer.java:842)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronize.acquire(AbstractQu
> >>>euedSynchronizer.java:1178)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.
> >>>java:186)
> >>> >         at
> >>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFet
> >>>cherManager.scala:168)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(Consumer
> >>>FetcherThread.scala:70)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:171)
> >>> >        at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> >
> >>>
> >>>"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-2"
> >>> > prio=10 tid=0x00007f52f8006800 nid=0x502 waiting on condition
> >>> > [0x00007f53b43cc000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14f5a0> (a
> >>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterr
> >>>upt(AbstractQueuedSynchronizer.java:811)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Abst
> >>>ractQueuedSynchronizer.java:842)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQu
> >>>euedSynchronizer.java:1178)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.
> >>>java:186)
> >>> >         at
> >>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFet
> >>>cherManager.scala:168)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(Consumer
> >>>FetcherThread.scala:70)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:171)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> >
> >>>
> >>>"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-1"
> >>> > prio=10 tid=0x00007f52f8004800 nid=0x3d8c waiting on condition
> >>> > [0x00007f53b57e0000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14f5a0> (a
> >>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterr
> >>>upt(AbstractQueuedSynchronizer.java:811)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Abst
> >>>ractQueuedSynchronizer.java:842)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQu
> >>>euedSynchronizer.java:1178)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.
> >>>java:186)
> >>> >         at
> >>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFet
> >>>cherManager.scala:168)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(Consumer
> >>>FetcherThread.scala:70)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:171)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> >
> >>>
> >>>"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-4"
> >>> > prio=10 tid=0x00007f52f800a800 nid=0x5b14 runnable
> >>> > [0x00007f53b47d0000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at sun.nio.ch.FileDispatcher.read0(Native Method)
> >>> >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> >>> >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
> >>> >         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
> >>> >         at
> >>>sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
> >>> >         - locked <0x00007f541d1e95f8> (a java.lang.Object)
> >>> >         at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
> >>> >         - locked <0x00007f541d1e9608> (a java.lang.Object)>>> >
>     at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
> >>> >         at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >>> >         - locked <0x00007f541d1e9618>(a
> >>>sun.nio.ch.ChannelInputStream)
> >>> >         at
> >>>
> >>>java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:22
> >>>1)
> >>> >         - locked <0x00007f541d1e9690> (a java.lang.Object)
> >>> >         at kafka.utils.Utils$.read(Utils.scala:394)
> >>> >         at
> >>>
> >>>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive
> >>>.scala:54)
> >>> >         at
> >>> kafka.network.Receive$class.readCompletely(Transission.scala:56)
> >>> >         at
> >>>
> >>>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferR
> >>>eceive.scala:29)
> >>> >         at
> >>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
> >>> >         at
> >>> kafka.consumer.SimpleConsumer.lifteTree1$1(SimpleConsumer.scala:85)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest
> >>>(SimpleConsumer.scala:83)
> >>> >         - locked <0x00007f541d1e96a0> (a java.lang.Object)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetc$1$$anonfun$apply$mcV$sp$1.a
> >>>pply$mcV$sp(SimpleConsumer.scala:122)
> >>> >         at
> >>>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.a
> >>>pply(SimpleConsumer.scala:122)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.a
> >>>pply(SimpleConsumer.scala:122)
> >>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsum
> >>>er.scala:121)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal
> >>>a:121)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal
> >>>a:121)
> >>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>> >         at
> >>>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:97)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> >
> >>>
> >>>"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378
> >>>308858270-7a2f6553-0-5"
> >>> > prio=10 tid=0x00007f52f800a000 nid=0x5b13 runnable
> >>> > [0x00007f53b45ce000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at sun.nio.ch.FileDispatcher.read0(Native Method)
> >>> >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> >>> >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
> >>> >         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
> >>> >         at
> >>>sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
> >>> >         - locked <0x00007f541d1e7910> (a java.lang.Object)
> >>> >         at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
> >>> >         - locked <0x00007f541d1e7920> (a java.lang.Object)
> >>> >         at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
> >>> >         at
> >>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >>> >         - locked <0x00007f541d1e7930> (a
> >>>sun.nio.ch.ChannelInputStream)
> >>> >         at
> >>>
> >>>java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:22
> >>>1)
> >>> >         - locked <0x00007f541d1e79a8> (a java.lang.Object)
> >>> >         at kafka.utils.Utils$.read(Utils.scala:394)
> >>> >         at
> >>>
> >>>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive
> >>>.scala:54)
> >>> >         at
> >>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>> >         at
> >>>
> >>>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferR
> >>>eceive.scala:29)
> >>> >         at
> >>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
> >>> >         at
> >>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest
> >>>(SimpleConsumer.scala:83)
> >>> >         - locked <0x00007f541d1e79b8> (a java.lang.Object)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.a
> >>>pply$mcV$sp(SimpleConsumer.scala:122)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.a
> >>>pply(SimpleConsumer.scala:122)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.a
> >>>pply(SimpleConsumer.scala:122)
> >>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsum
> >>>er.scala:121)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal
> >>>a:121)
> >>> >         at
> >>>
> >>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scal
> >>>a:121)
> >>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>> >         at
> >>>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
> >>>read.scala:97)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89
> >>>)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> >
> >>>
> >>>"mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-lea
> >>>der-finder-thread"
> >>> > prio=10 tid=0x00007f534800c800 nid=0x5b11 waiting on condition
> >>> > [0x00007f53b44cd000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d212d48> (a
> >>> > java.util.concurrent.CountDownLatch$Sync)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterr
> >>>upt(AbstractQueuedSynchronizer.java:811)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInt
> >>>erruptibly(AbstractQueuedSynchronizer.java:969)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInter
> >>>ruptibly(AbstractQueuedSynchronizer.java:1281)
> >>> >         at
> >>> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:
> >>>70)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$
> >>>2.apply(AbstractFetcherManager.scala:68)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$
> >>>2.apply(AbstractFetcherManager.scala:66)
> >>> >         at
> >>>
> >>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:
> >>>95)
> >>> >         at
> >>>
> >>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:
> >>>95)
> >>> >         at
> >>>scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >>> >         at
> >>> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> >>> >         at
> >>>
> >>>scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:19
> >>>0)
> >>> >         at
> >>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> >>> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> >>> >         at
> >>>
> >>>kafka.server.AbstractFetcherManager.shutdownIdleFetcherThreads(AbstractF
> >>>etcherManager.scala:66)
> >>> >         - locked <0x00007f541d151740> (a java.lang.Object)
> >>> >         at
> >>>
> >>>kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(Consumer
> >>>FetcherManager.scala:107)
> >>> >         at
> >>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>> >
> >>> > "ProducerThread-8" prio=10 tid=0x00007f582041c800 nid=0x16e7 waiting
> >>> > on condition [0x00007f53b48d1000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-7" prio=10 tid=0x00007f582041a800 nid=0x16e6 waiting
> >>> > on condition [0x00007f53b49d2000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-6" prio=10 tid=0x00007f5820418800 nid=0x16e5 waiting
> >>> > on condition [0x00007f53b4ad3000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-5" prio=10 tid=0x00007f5820416800 nid=0x16e4 waiting
> >>> > on condition [0x00007f53b4bd4000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-4" prio=10 tid=0x00007f5820414800 nid=0x16e3 waiting
> >>> > on condition [0x00007f53b4cd5000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-3" prio=10 tid=0x00007f5820412800 nid=0x16e2 waiting
> >>> > on condition [0x00007f53b4dd6000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-2" prio=10 tid=0x00007f5820410800 nid=0x16e1 waiting
> >>> > on condition [0x00007f53b4ed7000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "ProducerThread-1" prio=10 tid=0x00007f582040f000 nid=0x16e0 waiting
> >>> > on condition [0x00007f53b4fd8000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19ffb8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317
> >>>)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaM
> >>>igrationTool.java:296)
> >>> >         at
> >>>
> >>>kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.jav
> >>>a:387)
> >>> >
> >>> > "mirrormaker-3" prio=10 tid=0x00007f582040d800 nid=0x16df waiting on
> >>> > condition [0x00007f53b50d9000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d0fbe00> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >>> >         at
> >>>
> >>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
> >>> >         at
> >>>scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> >>> >         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> >>> >         at
> >>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
> >>> >
> >>> > "mirrormaker-2" prio=10 tid=0x00007f582040c800 nid=0x16de waiting on
> >>> > condition [0x00007f53b51da000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d135d30> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >>> >         at
> >>>
> >>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
> >>> >         at
> >>>scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> >>> >         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> >>> >         at
> >>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
> >>> >
> >>> > "mirrormaker-1" prio=10 tid=0x00007f582040c000 nid=0x16dd waiting on
> >>> > condition [0x00007f53b52db000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d113fb0> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >>> >         at
> >>>
> >>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
> >>> >         at
> >>>scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> >>> >         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> >>> >         at
> >>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
> >>> >
> >>> > "mirrormaker-0" prio=10 tid=0x00007f582040b800 nid=0x16dc waiting on
> >>> > condition [0x00007f53b53dc000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d186aa8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
> >>> >         at
> >>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >>> >         at
> >>>
> >>>kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
> >>> >         at
> >>>scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >>> >         at
> >>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> >>> >         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> >>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> >>> >         at
> >>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
> >>> >
> >>> > "main-EventThread" daemon prio=10 tid=0x00007f58203e9800 nid=0x16db
> >>> > waiting on condition [0x00007f53b54dd000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14cc38> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
> >>> >
> >>> > "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
> >>> > prio=10 tid=0x00007f58203e8800 nid=0x16da runnable
> >>> > [0x00007f53b55de000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >>> >         at
> >>>sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
> >>> >         at
> >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> >>> >         at
> >>>sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> >>> >         - locked <0x00007f541d133788> (a sun.nio.ch.Util$1)
> >>> >         - locked <0x00007f541d1337a0> (a
> >>> java.util.Collections$UnmodifiableSet)
> >>> >         - locked <0x00007f541d133710> (a
> >>>sun.nio.ch.EPollSelectorImpl)
> >>> >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> >>> >         at
> >>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
> >>> >
> >>> > "ZkClient-EventThread-29-mandm-zookeeper-asg.data.sfdc.net:2181
> >>> >
> >>><http://zkclient-eventthread-29-mandm-zookeeper-asg.data.sfdc.net:2181/
> >>> >"
> >>> > daemon prio=10 tid=0x00007f58203b8000 nid=0x16d9 waiting on condition
> >>> > [0x00007f53b56df000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14cdc8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>>org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
> >>> >
> >>> >
> >>>
> >>>"mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553_wat
> >>>cher_executor"
> >>> > prio=10 tid=0x00007f5820343000 nid=0x16d7 waiting on condition
> >>> > [0x00007f53b58e1000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14cf58> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:2116)
> >>> >         at
> >>>
> >>>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.r
> >>>un(ZookeeperConsumerConnector.scala:319)
> >>> >
> >>> > "Kafka-consumer-autocommit-1" prio=10 tid=0x00007f58203aa000
> >>> > nid=0x16d6 waiting on condition [0x00007f53b59e2000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d159288> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:583)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:576)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:
> >>>947)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
> >>>va:907)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "main-EventThread" daemon prio=10 tid=0x00007f5820339000 nid=0x16d5
> >>> > waiting on condition [0x00007f53b5ae3000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d158190> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
> >>> >
> >>> > "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
> >>> > prio=10 tid=0x00007f582034b000 nid=0x16d4 runnable
> >>> > [0x00007f53b5be4000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >>> >         at
> >>>sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
> >>> >         at
> >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> >>> >         at
> >>>sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> >>> >         - locked <0x00007f541d157c10> (a sun.nio.ch.Util$1)
> >>> >         - locked <0x00007f541d157bf8> (a
> >>> java.util.Collections$UnmodifiableSet)
> >>> >         - locked <0x00007f541d157510> (a
> >>>sun.nio.ch.EPollSelectorImpl)
> >>> >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> >>> >         at
> >>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
> >>> >
> >>> > "ZkClient-EventThread-23-mandm-zookeeper-asg.data.sfdc.net:2181
> >>> >
> >>><http://zkclient-eventthread-23-mandm-zookeeper-asg.data.sfdc.net:2181/
> >>> >"
> >>> > daemon prio=10 tid=0x00007f5820338800 nid=0x16d3 waiting on condition
> >>> > [0x00007f53b5ded000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d15d868> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>ait(AbstractQueuedSynchronizer.java:1987)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:3
> >>>99)
> >>> >         at
> >>>org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f5820334000 nid=0x16d1
> >>> > waiting on condition [0x00007f53b5eee000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d1a0938> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf99f890> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf99f8d8> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f5820332800 nid=0x16d0
> >>> > waiting on condition [0x00007f53b5fef000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d17d138> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf983c78> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf983cc0> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f5820331000 nid=0x16cf
> >>> > waiting on condition [0x00007f53b60f0000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d1846e0> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cff19708> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cff19750> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f582032f800 nid=0x16ce
> >>> > waiting on condition [0x00007f53b61f1000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d1a06f0> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf982c20> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf982c68> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f582032c000 nid=0x16cd
> >>> > waiting on condition [0x00007f53b62f2000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d175c98> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf9a3350> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf9a3398> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f582032a800 nid=0x16cc
> >>> > waiting on condition [0x00007f53b63f3000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d135cd8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf984498> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf9844e0> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f58202ef800 nid=0x16cb
> >>> > waiting on condition [0x00007f53b64f4000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d19f428> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf9a2b30> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf9a2b78> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "ProducerSendThread-" prio=10 tid=0x00007f58202d4800 nid=0x16ca
> >>> > waiting on condition [0x00007f53b65f5000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d1a0990> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at
> >>>
> >>>java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:4
> >>>24)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(P
> >>>roducerSendThread.scala:65)
> >>> >         at
> >>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.sc
> >>>ala:1104)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf99f070> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>>
> >>>scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scal
> >>>a:782)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
> >>> >         - locked <0x00007f56cf99f0b8> (a
> >>> scala.collection.immutable.Stream$Cons)
> >>> >         at
> >>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
> >>> >         at
> >>>scala.collection.immutable.Stream.foreach(Stream.scala:527)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread
> >>>.scala:66)
> >>> >         at
> >>>
> >>>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>> >
> >>> > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f58202d6800
> >>> > nid=0x16c9 waiting on condition [0x00007f53b66f6000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d0f73f8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:583)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:576)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:
> >>>947)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
> >>>va:907)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f5820306800
> >>> > nid=0x16c8 waiting on condition [0x00007f53b67f7000]
> >>> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d0f73f8> (a
> >>> >
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
> >>>aitNanos(AbstractQueuedSynchronizer.java:2025)
> >>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:583)
> >>> >         at
> >>>
> >>>java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(S
> >>>cheduledThreadPoolExecutor.java:576)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:
> >>>947)
> >>> >         at
> >>>
> >>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
> >>>va:907)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f58202ab800 nid=0x16c6
> >>> > runnable [0x00007f53b69f9000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
> >>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
> >>> >         - locked <0x00007f541d0f77d0> (a java.net.SocksSocketImpl)
> >>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
> >>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
> >>> >         at
> >>>
> >>>sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMISe
> >>>rverSocketFactory.java:34)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTrans
> >>>port.java:369)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "RMI TCP Accept-6665" daemon prio=10 tid=0x00007f58202a6000
> >>>nid=0x16c5
> >>> > runnable [0x00007f53b6afa000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
> >>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
> >>> >         - locked <0x00007f541d116970> (a java.net.SocksSocketImpl)
> >>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
> >>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTrans
> >>>port.java:369)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f5820295000 nid=0x16c4
> >>> > runnable [0x00007f53b6bfb000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
> >>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
> >>> >         - locked <0x00007f541d167190> (a java.net.SocksSocketImpl)
> >>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
> >>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTrans
> >>>port.java:369)
> >>> >         at
> >>>
> >>>sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
> >>> >         at java.lang.Thread.run(Thread.java:619)
> >>> >
> >>> > "Low Memory Detector" daemon prio=10 tid=0x00007f58200b8000
> >>>nid=0x16c3
> >>> > runnable [0x0000000000000000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >
> >>> > "CompilerThread1" daemon prio=10 tid=0x00007f58200b6000 nid=0x16c2
> >>> > waiting on condition [0x0000000000000000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >
> >>> > "CompilerThread0" daemon prio=10 tid=0x00007f58200b3000 nid=0x16c1
> >>> > waiting on condition [0x0000000000000000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >
> >>> > "Signal Dispatcher" daemon prio=10 tid=0x00007f58200b1000 nid=0x16c0
> >>> > runnable [0x0000000000000000]
> >>> >    java.lang.Thread.State: RUNNABLE
> >>> >
> >>> > "Finalizer" daemon prio=10 tid=0x00007f5820092800 nid=0x16bf in
> >>> > Object.wait() [0x00007f53b77f6000]
> >>> >    java.lang.Thread.State: WAITING (on object monitor)
> >>> >         at java.lang.Object.wait(Native Method)
> >>> >         - waiting on <0x00007f541d0fa430> (a
> >>> java.lang.ref.ReferenceQueue$Lock)
> >>> >         at
> >>>java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
> >>> >         - locked <0x00007f541d0fa430> (a
> >>> java.lang.ref.ReferenceQueue$Lock)
> >>> >         at
> >>>java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
> >>> >         at
> >>> java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
> >>> >
> >>> > "Reference Handler" daemon prio=10 tid=0x00007f5820090800 nid=0x16be
> >>> > in Object.wait() [0x00007f53b78f7000]
> >>> >    java.lang.Thread.State: WAITING (on object monitor)
> >>> >         at java.lang.Object.wait(Native Method)
> >>> >         - waiting on <0x00007f541d11ea80> (a
> >>> java.lang.ref.Reference$Lock)
> >>> >         at java.lang.Object.wait(Object.java:485)
> >>> >         at
> >>> java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
> >>> >         - locked <0x00007f541d11ea80> (a
> >>>java.lang.ref.Reference$Lock)
> >>> >
> >>> > "main" prio=10 tid=0x00007f582000a000 nid=0x16a5 waiting on condition
> >>> > [0x00007f5826bb4000]
> >>> >    java.lang.Thread.State: WAITING (parking)
> >>> >         at sun.misc.Unsafe.park(Native Method)
> >>> >         - parking to wait for  <0x00007f541d14eaa8> (a
> >>> > java.util.concurrent.CountDownLatch$Sync)
> >>> >         at
> >>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterr
> >>>upt(AbstractQueuedSynchronizer.java:811)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInt
> >>>erruptibly(AbstractQueuedSynchronizer.java:969)
> >>> >         at
> >>>
> >>>java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInter
> >>>ruptibly(AbstractQueuedSynchronizer.java:1281)
> >>> >         at
> >>> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> >>> >         at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scal
> >>>a:191)
> >>> >         at
> >>> kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
> >>> >         at
> >>> kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
> >>> >         at
> >>>
> >>>scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.sca
> >>>la:59)
> >>> >         at scala.collection.immutable.List.foreach(List.scala:76)
> >>> >         at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:159)
> >>> >         at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> >>> >
> >>> > "VM Thread" prio=10 tid=0x00007f582008c800 nid=0x16bd runnable
> >>> >
> >>> > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f582001d000
> >>> > nid=0x16a6 runnable
> >>> >
> >>> > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f582001f000
> >>> > nid=0x16a7 runnable
> >>> >
> >>> > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f5820021000
> >>> > nid=0x16a8 runnable
> >>> >
> >>> > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f5820022800
> >>> > nid=0x16a9 runnable
> >>> >
> >>> > "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f5820024800
> >>> > nid=0x16aa runnable
> >>> >
> >>> > "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f5820026800
> >>> > nid=0x16ab runnable
> >>> >
> >>> > "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f5820028000
> >>> > nid=0x16ac runnable
> >>> >
> >>> > "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f582002a000
> >>> > nid=0x16ad runnable
> >>> >
> >>> > "GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f582002c000
> >>> > nid=0x16ae runnable
> >>> >
> >>> > "GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f582002d800
> >>> > nid=0x16af runnable
> >>> >
> >>> > "GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f582002f800
> >>> > nid=0x16b0 runnable
> >>> >
> >>> > "GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f5820031000
> >>> > nid=0x16b1 runnable
> >>> >
> >>> > "GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f5820033000
> >>> > nid=0x16b2 runnable
> >>> >
> >>> > "GC task thread#13 (ParallelGC)" prio=10 tid=0x00007f5820035000
> >>> > nid=0x16b3 runnable
> >>> >
> >>> > "GC task thread#14 (ParallelGC)" prio=10 tid=0x00007f5820036800
> >>> > nid=0x16b4 runnable
> >>> >
> >>> > "GC task thread#15 (ParallelGC)" prio=10 tid=0x00007f5820038800
> >>> > nid=0x16b5 runnable
> >>> >
> >>> > "GC task thread#16 (ParallelGC)" prio=10 tid=0x00007f582003a800
> >>> > nid=0x16b6 runnable
> >>> >
> >>> > "GC task thread#17 (ParallelGC)" prio=10 tid=0x00007f582003c000
> >>> > nid=0x16b7 runnable
> >>> >
> >>> > "GC task thread#18 (ParallelGC)" prio=10 tid=0x00007f582003e000
> >>> > nid=0x16b8 runnable
> >>> >
> >>> > "GC task thread#19 (ParallelGC)" prio=10 tid=0x00007f5820040000
> >>> > nid=0x16b9 runnable
> >>> >
> >>> > "GC task thread#20 (ParallelGC)" prio=10 tid=0x00007f5820041800
> >>> > nid=0x16ba runnable
> >>> >
> >>> > "GC task thread#21 (ParallelGC)" prio=10 tid=0x00007f5820043800
> >>> > nid=0x16bb runnable
> >>> >
> >>> > "GC task thread#22 (ParallelGC)" prio=10 tid=0x00007f5820045800
> >>> > nid=0x16bc runnable
> >>> >
> >>> > "VM Periodic Task Thread" prio=10 tid=0x00007f58202ae000 nid=0x16c7
> >>> > waiting on condition
> >>> >
> >>> > JNI global references: 1362
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Tue, Sep 3, 2013 at 9:24 PM, Neha Narkhede
> >>><neha.narkhede@gmail.com
> >>> >wrote:
> >>> >
> >>> >> 16 GB is a very large heap. GC tuning becomes trickier as the size
> >>>of
> >>> the
> >>> >> heap increases. Are you sure you need that much memory to operate
> >>>the
> >>> >> mirror maker? For us, the following GC settings have worked well -
> >>> >>
> >>> >>
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-
> >>>Java
> >>> >>
> >>> >> Thanks,
> >>> >> Neha
> >>> >>
> >>> >>
> >>> >> On Tue, Sep 3, 2013 at 10:40 AM, Rajasekar Elango <
> >>> relango@salesforce.com
> >>> >> >wrote:
> >>> >>
> >>> >> > Thanks Neha,
> >>> >> >
> >>> >> > I did not take a thread dump before restarting, will get it when
> >>>it
> >>> >> happens
> >>> >> > again. We are using 16 Gigs of jvm heap. Do you have a
> >>> recommendation on
> >>> >> > jvm GC options.?
> >>> >> >
> >>> >> > Thanks,
> >>> >> > Raja.
> >>> >> >
> >>> >> >
> >>> >> > On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede <
> >>> neha.narkhede@gmail.com
> >>> >> > >wrote:
> >>> >> >
> >>> >> > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
> >>> >> > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
> >>> >> (Disconnected)
> >>> >> > > 2013-09-01 05:59:27,692 [main-SendThread(
> >>> >> > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
> >>> >> > >  (org.apache.zookeeper.
> >>> >> > > ClientCnxn)  - Client session timed out, have not
> >>> >> > > heard from server in 4002ms for sessionid 0x140c603da5b0032,
> >>> closing
> >>> >> > socket
> >>> >> > > connection and attempting reconnect
> >>> >> > >
> >>> >> > > This indicates that your mirror maker and/or your zookeeper
> >>> cluster is
> >>> >> > > GCing for long periods of time. I have observed that if "client
> >>> session
> >>> >> > > timed out" happens too many times, the client tends to lose
> >>> zookeeper
> >>> >> > > watches. This is a potential bug in zookeeper. If this happens,
> >>> your
> >>> >> > mirror
> >>> >> > > maker instance might not rebalance correctly and will start
> >>>losing
> >>> >> data.
> >>> >> > >
> >>> >> > > You mentioned consumption/production stopped on your mirror
> >>>maker,
> >>> >> could
> >>> >> > > you please take a thread dump and point us to it? Meanwhile, you
> >>> might
> >>> >> > want
> >>> >> > > to fix the GC pauses.
> >>> >> > >
> >>> >> > > Thanks,
> >>> >> > > Neha
> >>> >> > >
> >>> >> > >
> >>> >> > > On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango <
> >>> >> relango@salesforce.com
> >>> >> > > >wrote:
> >>> >> > >
> >>> >> > > > We found that mirrormaker stopped consuming and producing over
> >>> the
> >>> >> week
> >>> >> > > end
> >>> >> > > > (09/01). Just seeing "Client session timed out" messages in
> >>> >> mirrormaker
> >>> >> > > > log. I restarted to it today 09/03 to resume processing. Here
> >>>is
> >>> the
> >>> >> > logs
> >>> >> > > > line in reverse order.
> >>> >> > > >
> >>> >> > > >
> >>> >> > > > 2013-09-03 14:20:40,918
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.utils.VerifiableProperties)  - Verifying
> >>>properties
> >>> >> > > > 2013-09-03 14:20:40,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > > begin rebalancing consumer
> >>> >> > > >
> >>> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
> >>> >> try
> >>> >> > > #1
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > > Committing all offsets after clearing the fetcher queues
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > > Cleared the data chunks in all the consumer message iterators
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > > Cleared all relevant queues for this fetcher
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> >>> >> > > > [ConsumerFetcherManager-1378218012760] All connections stopped
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> >>> >> > > > [ConsumerFetcherManager-1378218012760] Stopping all fetchers
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> >>> >> > > > [ConsumerFetcherManager-1378218012760] Stopping leader finder
> >>> thread
> >>> >> > > > 2013-09-03 14:20:38,877
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > > Rebalancing attempt failed. Clearing the cache before the next
> >>> >> > > rebalancing
> >>> >> > > > operation is triggered
> >>> >> > > > 2013-09-03 14:20:38,876
> >>> >> > > >
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >>
> >>>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_wat
> >>>cher_executor]
> >>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> >>> >> > > >
> >>> >>
> >>>[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> >>> >> > > end
> >>> >> > > > rebalancing consumer
> >>> >> > > >
> >>> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
> >>> >> try
> >>> >> > > #0
> >>> >> > > > 2013-09-01 05:59:29,069 [main-SendThread(
> >>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
> >>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Socket connection
> >>> established
> >>> >> to
> >>> >> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181,
> >>>initiating
> >>> >> > session
> >>> >> > > > 2013-09-01 05:59:29,069 [main-SendThread(
> >>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
> >>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Opening socket
> >>>connection
> >>> to
> >>> >> > server
> >>> >> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181
> >>> >> > > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
> >>> >> > > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
> >>> >> > (Disconnected)
> >>> >> > > > 2013-09-01 05:59:27,692 [main-SendThread(
> >>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
> >>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Client session timed
> >>>out,
> >>> have
> >>> >> > not
> >>> >> > > > heard from server in 4002ms for sessionid 0x140c603da5b0032,
> >>> closing
> >>> >> > > socket
> >>> >> > > > connection and attempting reconnect
> >>> >> > > >
> >>> >> > > >
> >>> >> > > > As you can see, no log lines appeared after 2013-09-01
> >>>05:59:29.
> >>> I
> >>> >> > > checked
> >>> >> > > > lag using consumerOffsetChecker and observed that log size and
> >>> lag is
> >>> >> > > > growing, but offset of mirrormaker remains same. We have two
> >>> >> > mirrormaker
> >>> >> > > > process running and both of them had same issue during same
> >>>time
> >>> >> > frame..
> >>> >> > > > Any hint on what could be problem..? How do we go about
> >>>trouble
> >>> >> > shooting
> >>> >> > > > this..?
> >>> >> > > >
> >>> >> > > > Thanks in advance..
> >>> >> > > >
> >>> >> > > > --
> >>> >> > > > Thanks,
> >>> >> > > > Raja.
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > --
> >>> >> > Thanks,
> >>> >> > Raja.
> >>> >> >
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Thanks,
> >>> > Raja.
> >>>
> >>
> >>
> >>
> >> --
> >> Thanks,
> >> Raja.
> >>
> >
> >
> >
> >--
> >Thanks,
> >Raja.
>
>


-- 
Thanks,
Raja.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message