kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joel Koshy <jjkosh...@gmail.com>
Subject Re: Mirrormaker stopped consuming
Date Tue, 10 Sep 2013 23:15:50 GMT
Also, the "stopped consumption" issue is separate from the flurry of
notleaderforpartition exceptions that you saw. That could be an issue
on your brokers and/or zookeeper cluster, but leader changes would
trigger the deadlock in KAFKA-937.

Thanks,

Joel


On Tue, Sep 10, 2013 at 4:12 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-1"
> prio=10 tid=0x00007f52f8004800 nid=0x3d8c waiting on condition
> [0x00007f53b57e0000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14f5a0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>         at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> "mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-leader-finder-thread"
> prio=10 tid=0x00007f534800c800 nid=0x5b11 waiting on condition
> [0x00007f53b44cd000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d212d48> (a
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>         at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>         at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:68)
>         at kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:66)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>         at kafka.server.AbstractFetcherManager.shutdownIdleFetcherThreads(AbstractFetcherManager.scala:66)
>         - locked <0x00007f541d151740> (a java.lang.Object)
>         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:107)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> This is what I saw in your threaddump - and they are not RUNNABLE. I
> think what you mean is that they are not waiting on the same condition
> - however, the deadlock is very subtle and not easily seen in the
> dump. The jira provides the sequence that causes it.
>
> The fix for KAFKA-937 was made in mid-June, so it is possible that
> your snapshot revision which you said is from early June does not have
> the fix.
>
> Thanks,
>
> Joel
>
>
>
> On Tue, Sep 10, 2013 at 3:07 PM, Rajasekar Elango
> <relango@salesforce.com> wrote:
>> I did more investigation to confirm if this could be really due to jira
>> issue https://issues.apache.org/jira/browse/KAFKA-937 . But all thread
>> dumps I got did not had any dead locks for even blocked threads. All
>> threads are pretty much in RUNNABLE state. I consistently see
>> "kafka.common.NotLeaderForPartitionException"
>> in mirrormaker logs every time this happens. Could
>> NotLeaderForPartitionException
>> cause consumer threads to stop consuming..? What are the debugging steps we
>> can try to identify root cause of this problem?
>>
>> Thanks,
>> Raja.
>>
>>
>> On Thu, Sep 5, 2013 at 7:33 PM, Rajasekar Elango <relango@salesforce.com>wrote:
>>
>>> This jira seem to be resolved very recently. We are using snapshot of 0.8
>>> release from beginning of june.
>>>
>>> Thanks,
>>> Raja.
>>>
>>>
>>> On Thu, Sep 5, 2013 at 1:49 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
>>>
>>>> Your threaddump looks similar to the deadlock in
>>>> https://issues.apache.org/jira/browse/KAFKA-937 I thought that fix was
>>>> included in the beta release - which version of Kafka are you using?
>>>>
>>>>
>>>> On Thu, Sep 5, 2013 at 8:35 AM, Rajasekar Elango <relango@salesforce.com>
>>>> wrote:
>>>> > Thanks , I am working on tuning GC options. This happened again today
>>>> and
>>>> > mirrormaker stopped consuming. This time the last exception was:
>>>> >
>>>> > 2013-09-05 07:01:52,931
>>>> >
>>>> [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2]
>>>> > WARN  (kafka.consumer.ConsumerFetcherThread)  -
>>>> >
>>>> [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2],
>>>> > error for partition [jmx,5] to broker
>>>> > 2kafka.common.NotLeaderForPartitionException
>>>> >         at
>>>> sun.reflect.GeneratedConstructorAccessor18.newInstance(Unknown Source)
>>>> >         at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>>>> >         at
>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>>>> >         at java.lang.Class.newInstance0(Class.java:355)
>>>> >         at java.lang.Class.newInstance(Class.java:308)
>>>> >         at
>>>> kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
>>>> >         at kafka.utils.Logging$class.warn(Logging.scala:88)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
>>>> >         at
>>>> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
>>>> >         at
>>>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> >
>>>> > I also captured thread dump and pasting it below. Can you confirm if
>>>> > this could again due to Full GC or something else that need to be
>>>> > looked at.
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > ull thread dump Java HotSpot(TM) 64-Bit Server VM (17.0-b16 mixed mode):
>>>> >
>>>> > "Attach Listener" daemon prio=10 tid=0x00007f539c001800 nid=0x6b49
>>>> > waiting on condition [0x0000000000000000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >
>>>> >
>>>> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-3"
>>>> > prio=10 tid=0x00007f52f800c000 nid=0x5e03 waiting on condition
>>>> > [0x00007f53b46cf000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14f5a0> (a
>>>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2"
>>>> > prio=10 tid=0x00007f52f8006800 nid=0x5e02 waiting on condition
>>>> > [0x00007f53b43cc000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14f5a0> (a
>>>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-1"
>>>> > prio=10 tid=0x00007f52f8004800 nid=0x3d8c waiting on condition
>>>> > [0x00007f53b57e0000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14f5a0> (a
>>>> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>>>> >         at
>>>> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-4"
>>>> > prio=10 tid=0x00007f52f800a800 nid=0x5b14 runnable
>>>> > [0x00007f53b47d0000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at sun.nio.ch.FileDispatcher.read0(Native Method)
>>>> >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>>>> >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
>>>> >         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
>>>> >         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
>>>> >         - locked <0x00007f541d1e95f8> (a java.lang.Object)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
>>>> >         - locked <0x00007f541d1e9608> (a java.lang.Object)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>>> >         - locked <0x00007f541d1e9618> (a sun.nio.ch.ChannelInputStream)
>>>> >         at
>>>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>>>> >         - locked <0x00007f541d1e9690> (a java.lang.Object)
>>>> >         at kafka.utils.Utils$.read(Utils.scala:394)
>>>> >         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>> >         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>> >         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>> >         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>>>> >         - locked <0x00007f541d1e96a0> (a java.lang.Object)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>>>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>>>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-5"
>>>> > prio=10 tid=0x00007f52f800a000 nid=0x5b13 runnable
>>>> > [0x00007f53b45ce000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at sun.nio.ch.FileDispatcher.read0(Native Method)
>>>> >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>>>> >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
>>>> >         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
>>>> >         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
>>>> >         - locked <0x00007f541d1e7910> (a java.lang.Object)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
>>>> >         - locked <0x00007f541d1e7920> (a java.lang.Object)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
>>>> >         at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>>> >         - locked <0x00007f541d1e7930> (a sun.nio.ch.ChannelInputStream)
>>>> >         at
>>>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>>>> >         - locked <0x00007f541d1e79a8> (a java.lang.Object)
>>>> >         at kafka.utils.Utils$.read(Utils.scala:394)
>>>> >         at
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>> >         at
>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>> >         at
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>> >         at
>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>>>> >         - locked <0x00007f541d1e79b8> (a java.lang.Object)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>>>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>>>> >         at
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>>>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> >
>>>> "mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-leader-finder-thread"
>>>> > prio=10 tid=0x00007f534800c800 nid=0x5b11 waiting on condition
>>>> > [0x00007f53b44cd000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d212d48> (a
>>>> > java.util.concurrent.CountDownLatch$Sync)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>>>> >         at
>>>> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>>>> >         at
>>>> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:70)
>>>> >         at
>>>> kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:68)
>>>> >         at
>>>> kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:66)
>>>> >         at
>>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>>>> >         at
>>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>>>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>>> >         at
>>>> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>>>> >         at
>>>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>>>> >         at
>>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>>>> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>>>> >         at
>>>> kafka.server.AbstractFetcherManager.shutdownIdleFetcherThreads(AbstractFetcherManager.scala:66)
>>>> >         - locked <0x00007f541d151740> (a java.lang.Object)
>>>> >         at
>>>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:107)
>>>> >         at
>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>>> >
>>>> > "ProducerThread-8" prio=10 tid=0x00007f582041c800 nid=0x16e7 waiting
>>>> > on condition [0x00007f53b48d1000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-7" prio=10 tid=0x00007f582041a800 nid=0x16e6 waiting
>>>> > on condition [0x00007f53b49d2000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-6" prio=10 tid=0x00007f5820418800 nid=0x16e5 waiting
>>>> > on condition [0x00007f53b4ad3000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-5" prio=10 tid=0x00007f5820416800 nid=0x16e4 waiting
>>>> > on condition [0x00007f53b4bd4000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-4" prio=10 tid=0x00007f5820414800 nid=0x16e3 waiting
>>>> > on condition [0x00007f53b4cd5000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-3" prio=10 tid=0x00007f5820412800 nid=0x16e2 waiting
>>>> > on condition [0x00007f53b4dd6000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-2" prio=10 tid=0x00007f5820410800 nid=0x16e1 waiting
>>>> > on condition [0x00007f53b4ed7000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "ProducerThread-1" prio=10 tid=0x00007f582040f000 nid=0x16e0 waiting
>>>> > on condition [0x00007f53b4fd8000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19ffb8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>>>> >         at
>>>> kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>>>> >
>>>> > "mirrormaker-3" prio=10 tid=0x00007f582040d800 nid=0x16df waiting on
>>>> > condition [0x00007f53b50d9000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d0fbe00> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>>>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>>> >         at
>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>>>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>>>> >         at
>>>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>>>> >
>>>> > "mirrormaker-2" prio=10 tid=0x00007f582040c800 nid=0x16de waiting on
>>>> > condition [0x00007f53b51da000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d135d30> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>>>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>>> >         at
>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>>>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>>>> >         at
>>>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>>>> >
>>>> > "mirrormaker-1" prio=10 tid=0x00007f582040c000 nid=0x16dd waiting on
>>>> > condition [0x00007f53b52db000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d113fb0> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>>>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>>> >         at
>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>>>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>>>> >         at
>>>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>>>> >
>>>> > "mirrormaker-0" prio=10 tid=0x00007f582040b800 nid=0x16dc waiting on
>>>> > condition [0x00007f53b53dc000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d186aa8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>>>> >         at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>>>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>>>> >         at
>>>> kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>>> >         at
>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>>>> >         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>>>> >         at
>>>> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>>>> >
>>>> > "main-EventThread" daemon prio=10 tid=0x00007f58203e9800 nid=0x16db
>>>> > waiting on condition [0x00007f53b54dd000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14cc38> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
>>>> >
>>>> > "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
>>>> > prio=10 tid=0x00007f58203e8800 nid=0x16da runnable
>>>> > [0x00007f53b55de000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>> >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
>>>> >         at
>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>>>> >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>>>> >         - locked <0x00007f541d133788> (a sun.nio.ch.Util$1)
>>>> >         - locked <0x00007f541d1337a0> (a
>>>> java.util.Collections$UnmodifiableSet)
>>>> >         - locked <0x00007f541d133710> (a sun.nio.ch.EPollSelectorImpl)
>>>> >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>>>> >         at
>>>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
>>>> >
>>>> > "ZkClient-EventThread-29-mandm-zookeeper-asg.data.sfdc.net:2181
>>>> > <http://zkclient-eventthread-29-mandm-zookeeper-asg.data.sfdc.net:2181/
>>>> >"
>>>> > daemon prio=10 tid=0x00007f58203b8000 nid=0x16d9 waiting on condition
>>>> > [0x00007f53b56df000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14cdc8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
>>>> >
>>>> >
>>>> "mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553_watcher_executor"
>>>> > prio=10 tid=0x00007f5820343000 nid=0x16d7 waiting on condition
>>>> > [0x00007f53b58e1000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14cf58> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2116)
>>>> >         at
>>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:319)
>>>> >
>>>> > "Kafka-consumer-autocommit-1" prio=10 tid=0x00007f58203aa000
>>>> > nid=0x16d6 waiting on condition [0x00007f53b59e2000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d159288> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "main-EventThread" daemon prio=10 tid=0x00007f5820339000 nid=0x16d5
>>>> > waiting on condition [0x00007f53b5ae3000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d158190> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at
>>>> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
>>>> >
>>>> > "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
>>>> > prio=10 tid=0x00007f582034b000 nid=0x16d4 runnable
>>>> > [0x00007f53b5be4000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>> >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
>>>> >         at
>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>>>> >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>>>> >         - locked <0x00007f541d157c10> (a sun.nio.ch.Util$1)
>>>> >         - locked <0x00007f541d157bf8> (a
>>>> java.util.Collections$UnmodifiableSet)
>>>> >         - locked <0x00007f541d157510> (a sun.nio.ch.EPollSelectorImpl)
>>>> >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>>>> >         at
>>>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
>>>> >
>>>> > "ZkClient-EventThread-23-mandm-zookeeper-asg.data.sfdc.net:2181
>>>> > <http://zkclient-eventthread-23-mandm-zookeeper-asg.data.sfdc.net:2181/
>>>> >"
>>>> > daemon prio=10 tid=0x00007f5820338800 nid=0x16d3 waiting on condition
>>>> > [0x00007f53b5ded000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d15d868> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f5820334000 nid=0x16d1
>>>> > waiting on condition [0x00007f53b5eee000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d1a0938> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf99f890> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf99f8d8> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f5820332800 nid=0x16d0
>>>> > waiting on condition [0x00007f53b5fef000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d17d138> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf983c78> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf983cc0> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f5820331000 nid=0x16cf
>>>> > waiting on condition [0x00007f53b60f0000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d1846e0> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cff19708> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cff19750> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f582032f800 nid=0x16ce
>>>> > waiting on condition [0x00007f53b61f1000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d1a06f0> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf982c20> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf982c68> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f582032c000 nid=0x16cd
>>>> > waiting on condition [0x00007f53b62f2000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d175c98> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf9a3350> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf9a3398> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f582032a800 nid=0x16cc
>>>> > waiting on condition [0x00007f53b63f3000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d135cd8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf984498> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf9844e0> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f58202ef800 nid=0x16cb
>>>> > waiting on condition [0x00007f53b64f4000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d19f428> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf9a2b30> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf9a2b78> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "ProducerSendThread-" prio=10 tid=0x00007f58202d4800 nid=0x16ca
>>>> > waiting on condition [0x00007f53b65f5000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d1a0990> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at
>>>> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>>>> >         at
>>>> scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf99f070> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>>>> >         - locked <0x00007f56cf99f0b8> (a
>>>> scala.collection.immutable.Stream$Cons)
>>>> >         at
>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>>>> >         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>> >         at
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>> >
>>>> > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f58202d6800
>>>> > nid=0x16c9 waiting on condition [0x00007f53b66f6000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d0f73f8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f5820306800
>>>> > nid=0x16c8 waiting on condition [0x00007f53b67f7000]
>>>> >    java.lang.Thread.State: TIMED_WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d0f73f8> (a
>>>> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>>>> >         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>>>> >         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>>>> >         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f58202ab800 nid=0x16c6
>>>> > runnable [0x00007f53b69f9000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
>>>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>>>> >         - locked <0x00007f541d0f77d0> (a java.net.SocksSocketImpl)
>>>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>>>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
>>>> >         at
>>>> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:34)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "RMI TCP Accept-6665" daemon prio=10 tid=0x00007f58202a6000 nid=0x16c5
>>>> > runnable [0x00007f53b6afa000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
>>>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>>>> >         - locked <0x00007f541d116970> (a java.net.SocksSocketImpl)
>>>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>>>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f5820295000 nid=0x16c4
>>>> > runnable [0x00007f53b6bfb000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >         at java.net.PlainSocketImpl.socketAccept(Native Method)
>>>> >         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>>>> >         - locked <0x00007f541d167190> (a java.net.SocksSocketImpl)
>>>> >         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>>>> >         at java.net.ServerSocket.accept(ServerSocket.java:421)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>>>> >         at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>>>> >         at java.lang.Thread.run(Thread.java:619)
>>>> >
>>>> > "Low Memory Detector" daemon prio=10 tid=0x00007f58200b8000 nid=0x16c3
>>>> > runnable [0x0000000000000000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >
>>>> > "CompilerThread1" daemon prio=10 tid=0x00007f58200b6000 nid=0x16c2
>>>> > waiting on condition [0x0000000000000000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >
>>>> > "CompilerThread0" daemon prio=10 tid=0x00007f58200b3000 nid=0x16c1
>>>> > waiting on condition [0x0000000000000000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >
>>>> > "Signal Dispatcher" daemon prio=10 tid=0x00007f58200b1000 nid=0x16c0
>>>> > runnable [0x0000000000000000]
>>>> >    java.lang.Thread.State: RUNNABLE
>>>> >
>>>> > "Finalizer" daemon prio=10 tid=0x00007f5820092800 nid=0x16bf in
>>>> > Object.wait() [0x00007f53b77f6000]
>>>> >    java.lang.Thread.State: WAITING (on object monitor)
>>>> >         at java.lang.Object.wait(Native Method)
>>>> >         - waiting on <0x00007f541d0fa430> (a
>>>> java.lang.ref.ReferenceQueue$Lock)
>>>> >         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
>>>> >         - locked <0x00007f541d0fa430> (a
>>>> java.lang.ref.ReferenceQueue$Lock)
>>>> >         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
>>>> >         at
>>>> java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>>>> >
>>>> > "Reference Handler" daemon prio=10 tid=0x00007f5820090800 nid=0x16be
>>>> > in Object.wait() [0x00007f53b78f7000]
>>>> >    java.lang.Thread.State: WAITING (on object monitor)
>>>> >         at java.lang.Object.wait(Native Method)
>>>> >         - waiting on <0x00007f541d11ea80> (a
>>>> java.lang.ref.Reference$Lock)
>>>> >         at java.lang.Object.wait(Object.java:485)
>>>> >         at
>>>> java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>>>> >         - locked <0x00007f541d11ea80> (a java.lang.ref.Reference$Lock)
>>>> >
>>>> > "main" prio=10 tid=0x00007f582000a000 nid=0x16a5 waiting on condition
>>>> > [0x00007f5826bb4000]
>>>> >    java.lang.Thread.State: WAITING (parking)
>>>> >         at sun.misc.Unsafe.park(Native Method)
>>>> >         - parking to wait for  <0x00007f541d14eaa8> (a
>>>> > java.util.concurrent.CountDownLatch$Sync)
>>>> >         at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>>>> >         at
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>>>> >         at
>>>> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>>>> >         at
>>>> kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scala:191)
>>>> >         at
>>>> kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
>>>> >         at
>>>> kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
>>>> >         at
>>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>>>> >         at scala.collection.immutable.List.foreach(List.scala:76)
>>>> >         at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:159)
>>>> >         at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>>>> >
>>>> > "VM Thread" prio=10 tid=0x00007f582008c800 nid=0x16bd runnable
>>>> >
>>>> > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f582001d000
>>>> > nid=0x16a6 runnable
>>>> >
>>>> > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f582001f000
>>>> > nid=0x16a7 runnable
>>>> >
>>>> > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f5820021000
>>>> > nid=0x16a8 runnable
>>>> >
>>>> > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f5820022800
>>>> > nid=0x16a9 runnable
>>>> >
>>>> > "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f5820024800
>>>> > nid=0x16aa runnable
>>>> >
>>>> > "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f5820026800
>>>> > nid=0x16ab runnable
>>>> >
>>>> > "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f5820028000
>>>> > nid=0x16ac runnable
>>>> >
>>>> > "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f582002a000
>>>> > nid=0x16ad runnable
>>>> >
>>>> > "GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f582002c000
>>>> > nid=0x16ae runnable
>>>> >
>>>> > "GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f582002d800
>>>> > nid=0x16af runnable
>>>> >
>>>> > "GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f582002f800
>>>> > nid=0x16b0 runnable
>>>> >
>>>> > "GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f5820031000
>>>> > nid=0x16b1 runnable
>>>> >
>>>> > "GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f5820033000
>>>> > nid=0x16b2 runnable
>>>> >
>>>> > "GC task thread#13 (ParallelGC)" prio=10 tid=0x00007f5820035000
>>>> > nid=0x16b3 runnable
>>>> >
>>>> > "GC task thread#14 (ParallelGC)" prio=10 tid=0x00007f5820036800
>>>> > nid=0x16b4 runnable
>>>> >
>>>> > "GC task thread#15 (ParallelGC)" prio=10 tid=0x00007f5820038800
>>>> > nid=0x16b5 runnable
>>>> >
>>>> > "GC task thread#16 (ParallelGC)" prio=10 tid=0x00007f582003a800
>>>> > nid=0x16b6 runnable
>>>> >
>>>> > "GC task thread#17 (ParallelGC)" prio=10 tid=0x00007f582003c000
>>>> > nid=0x16b7 runnable
>>>> >
>>>> > "GC task thread#18 (ParallelGC)" prio=10 tid=0x00007f582003e000
>>>> > nid=0x16b8 runnable
>>>> >
>>>> > "GC task thread#19 (ParallelGC)" prio=10 tid=0x00007f5820040000
>>>> > nid=0x16b9 runnable
>>>> >
>>>> > "GC task thread#20 (ParallelGC)" prio=10 tid=0x00007f5820041800
>>>> > nid=0x16ba runnable
>>>> >
>>>> > "GC task thread#21 (ParallelGC)" prio=10 tid=0x00007f5820043800
>>>> > nid=0x16bb runnable
>>>> >
>>>> > "GC task thread#22 (ParallelGC)" prio=10 tid=0x00007f5820045800
>>>> > nid=0x16bc runnable
>>>> >
>>>> > "VM Periodic Task Thread" prio=10 tid=0x00007f58202ae000 nid=0x16c7
>>>> > waiting on condition
>>>> >
>>>> > JNI global references: 1362
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Sep 3, 2013 at 9:24 PM, Neha Narkhede <neha.narkhede@gmail.com
>>>> >wrote:
>>>> >
>>>> >> 16 GB is a very large heap. GC tuning becomes trickier as the size of
>>>> the
>>>> >> heap increases. Are you sure you need that much memory to operate the
>>>> >> mirror maker? For us, the following GC settings have worked well -
>>>> >>
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Java
>>>> >>
>>>> >> Thanks,
>>>> >> Neha
>>>> >>
>>>> >>
>>>> >> On Tue, Sep 3, 2013 at 10:40 AM, Rajasekar Elango <
>>>> relango@salesforce.com
>>>> >> >wrote:
>>>> >>
>>>> >> > Thanks Neha,
>>>> >> >
>>>> >> > I did not take a thread dump before restarting, will get it when it
>>>> >> happens
>>>> >> > again. We are using 16 Gigs of jvm heap. Do you have a
>>>> recommendation on
>>>> >> > jvm GC options.?
>>>> >> >
>>>> >> > Thanks,
>>>> >> > Raja.
>>>> >> >
>>>> >> >
>>>> >> > On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede <
>>>> neha.narkhede@gmail.com
>>>> >> > >wrote:
>>>> >> >
>>>> >> > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
>>>> >> > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
>>>> >> (Disconnected)
>>>> >> > > 2013-09-01 05:59:27,692 [main-SendThread(
>>>> >> > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>>>> >> > >  (org.apache.zookeeper.
>>>> >> > > ClientCnxn)  - Client session timed out, have not
>>>> >> > > heard from server in 4002ms for sessionid 0x140c603da5b0032,
>>>> closing
>>>> >> > socket
>>>> >> > > connection and attempting reconnect
>>>> >> > >
>>>> >> > > This indicates that your mirror maker and/or your zookeeper
>>>> cluster is
>>>> >> > > GCing for long periods of time. I have observed that if "client
>>>> session
>>>> >> > > timed out" happens too many times, the client tends to lose
>>>> zookeeper
>>>> >> > > watches. This is a potential bug in zookeeper. If this happens,
>>>> your
>>>> >> > mirror
>>>> >> > > maker instance might not rebalance correctly and will start losing
>>>> >> data.
>>>> >> > >
>>>> >> > > You mentioned consumption/production stopped on your mirror maker,
>>>> >> could
>>>> >> > > you please take a thread dump and point us to it? Meanwhile, you
>>>> might
>>>> >> > want
>>>> >> > > to fix the GC pauses.
>>>> >> > >
>>>> >> > > Thanks,
>>>> >> > > Neha
>>>> >> > >
>>>> >> > >
>>>> >> > > On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango <
>>>> >> relango@salesforce.com
>>>> >> > > >wrote:
>>>> >> > >
>>>> >> > > > We found that mirrormaker stopped consuming and producing over
>>>> the
>>>> >> week
>>>> >> > > end
>>>> >> > > > (09/01). Just seeing "Client session timed out" messages in
>>>> >> mirrormaker
>>>> >> > > > log. I restarted to it today 09/03 to resume processing. Here is
>>>> the
>>>> >> > logs
>>>> >> > > > line in reverse order.
>>>> >> > > >
>>>> >> > > >
>>>> >> > > > 2013-09-03 14:20:40,918
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.utils.VerifiableProperties)  - Verifying properties
>>>> >> > > > 2013-09-03 14:20:40,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > > begin rebalancing consumer
>>>> >> > > >
>>>> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
>>>> >> try
>>>> >> > > #1
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > > Committing all offsets after clearing the fetcher queues
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > > Cleared the data chunks in all the consumer message iterators
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > > Cleared all relevant queues for this fetcher
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>>>> >> > > > [ConsumerFetcherManager-1378218012760] All connections stopped
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>>>> >> > > > [ConsumerFetcherManager-1378218012760] Stopping all fetchers
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>>>> >> > > > [ConsumerFetcherManager-1378218012760] Stopping leader finder
>>>> thread
>>>> >> > > > 2013-09-03 14:20:38,877
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > > Rebalancing attempt failed. Clearing the cache before the next
>>>> >> > > rebalancing
>>>> >> > > > operation is triggered
>>>> >> > > > 2013-09-03 14:20:38,876
>>>> >> > > >
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >>
>>>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>>>> >> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>>>> >> > > >
>>>> >> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>>>> >> > > end
>>>> >> > > > rebalancing consumer
>>>> >> > > >
>>>> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
>>>> >> try
>>>> >> > > #0
>>>> >> > > > 2013-09-01 05:59:29,069 [main-SendThread(
>>>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>>>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Socket connection
>>>> established
>>>> >> to
>>>> >> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181, initiating
>>>> >> > session
>>>> >> > > > 2013-09-01 05:59:29,069 [main-SendThread(
>>>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>>>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Opening socket connection
>>>> to
>>>> >> > server
>>>> >> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181
>>>> >> > > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
>>>> >> > > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
>>>> >> > (Disconnected)
>>>> >> > > > 2013-09-01 05:59:27,692 [main-SendThread(
>>>> >> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>>>> >> > > >  (org.apache.zookeeper.ClientCnxn)  - Client session timed out,
>>>> have
>>>> >> > not
>>>> >> > > > heard from server in 4002ms for sessionid 0x140c603da5b0032,
>>>> closing
>>>> >> > > socket
>>>> >> > > > connection and attempting reconnect
>>>> >> > > >
>>>> >> > > >
>>>> >> > > > As you can see, no log lines appeared after 2013-09-01 05:59:29.
>>>> I
>>>> >> > > checked
>>>> >> > > > lag using consumerOffsetChecker and observed that log size and
>>>> lag is
>>>> >> > > > growing, but offset of mirrormaker remains same. We have two
>>>> >> > mirrormaker
>>>> >> > > > process running and both of them had same issue during same time
>>>> >> > frame..
>>>> >> > > > Any hint on what could be problem..? How do we go about trouble
>>>> >> > shooting
>>>> >> > > > this..?
>>>> >> > > >
>>>> >> > > > Thanks in advance..
>>>> >> > > >
>>>> >> > > > --
>>>> >> > > > Thanks,
>>>> >> > > > Raja.
>>>> >> > > >
>>>> >> > >
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > --
>>>> >> > Thanks,
>>>> >> > Raja.
>>>> >> >
>>>> >>
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Thanks,
>>>> > Raja.
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Raja.
>>>
>>
>>
>>
>> --
>> Thanks,
>> Raja.

Mime
View raw message