kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joel Koshy <jjkosh...@gmail.com>
Subject Re: Mirrormaker stopped consuming
Date Thu, 05 Sep 2013 17:49:49 GMT
Your threaddump looks similar to the deadlock in
https://issues.apache.org/jira/browse/KAFKA-937 I thought that fix was
included in the beta release - which version of Kafka are you using?


On Thu, Sep 5, 2013 at 8:35 AM, Rajasekar Elango <relango@salesforce.com> wrote:
> Thanks , I am working on tuning GC options. This happened again today and
> mirrormaker stopped consuming. This time the last exception was:
>
> 2013-09-05 07:01:52,931
> [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2]
> WARN  (kafka.consumer.ConsumerFetcherThread)  -
> [ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2],
> error for partition [jmx,5] to broker
> 2kafka.common.NotLeaderForPartitionException
>         at sun.reflect.GeneratedConstructorAccessor18.newInstance(Unknown Source)
>         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>         at java.lang.Class.newInstance0(Class.java:355)
>         at java.lang.Class.newInstance(Class.java:308)
>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
>         at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
>         at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
>         at kafka.utils.Logging$class.warn(Logging.scala:88)
>         at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
>         at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
>         at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
>         at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
>         at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
>
> I also captured thread dump and pasting it below. Can you confirm if
> this could again due to Full GC or something else that need to be
> looked at.
>
>
>
>
> ull thread dump Java HotSpot(TM) 64-Bit Server VM (17.0-b16 mixed mode):
>
> "Attach Listener" daemon prio=10 tid=0x00007f539c001800 nid=0x6b49
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-3"
> prio=10 tid=0x00007f52f800c000 nid=0x5e03 waiting on condition
> [0x00007f53b46cf000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14f5a0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>         at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-2"
> prio=10 tid=0x00007f52f8006800 nid=0x5e02 waiting on condition
> [0x00007f53b43cc000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14f5a0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>         at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-1"
> prio=10 tid=0x00007f52f8004800 nid=0x3d8c waiting on condition
> [0x00007f53b57e0000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14f5a0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:168)
>         at kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:171)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-4"
> prio=10 tid=0x00007f52f800a800 nid=0x5b14 runnable
> [0x00007f53b47d0000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.FileDispatcher.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
>         - locked <0x00007f541d1e95f8> (a java.lang.Object)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
>         - locked <0x00007f541d1e9608> (a java.lang.Object)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         - locked <0x00007f541d1e9618> (a sun.nio.ch.ChannelInputStream)
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         - locked <0x00007f541d1e9690> (a java.lang.Object)
>         at kafka.utils.Utils$.read(Utils.scala:394)
>         at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>         - locked <0x00007f541d1e96a0> (a java.lang.Object)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-0-5"
> prio=10 tid=0x00007f52f800a000 nid=0x5b13 runnable
> [0x00007f53b45ce000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.FileDispatcher.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:237)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:210)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
>         - locked <0x00007f541d1e7910> (a java.lang.Object)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:42)
>         - locked <0x00007f541d1e7920> (a java.lang.Object)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:92)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         - locked <0x00007f541d1e7930> (a sun.nio.ch.ChannelInputStream)
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         - locked <0x00007f541d1e79a8> (a java.lang.Object)
>         at kafka.utils.Utils$.read(Utils.scala:394)
>         at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:103)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
>         - locked <0x00007f541d1e79b8> (a java.lang.Object)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
>         at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553-leader-finder-thread"
> prio=10 tid=0x00007f534800c800 nid=0x5b11 waiting on condition
> [0x00007f53b44cd000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d212d48> (a
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>         at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>         at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:70)
>         at kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:68)
>         at kafka.server.AbstractFetcherManager$$anonfun$shutdownIdleFetcherThreads$2.apply(AbstractFetcherManager.scala:66)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>         at kafka.server.AbstractFetcherManager.shutdownIdleFetcherThreads(AbstractFetcherManager.scala:66)
>         - locked <0x00007f541d151740> (a java.lang.Object)
>         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:107)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ProducerThread-8" prio=10 tid=0x00007f582041c800 nid=0x16e7 waiting
> on condition [0x00007f53b48d1000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-7" prio=10 tid=0x00007f582041a800 nid=0x16e6 waiting
> on condition [0x00007f53b49d2000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-6" prio=10 tid=0x00007f5820418800 nid=0x16e5 waiting
> on condition [0x00007f53b4ad3000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-5" prio=10 tid=0x00007f5820416800 nid=0x16e4 waiting
> on condition [0x00007f53b4bd4000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-4" prio=10 tid=0x00007f5820414800 nid=0x16e3 waiting
> on condition [0x00007f53b4cd5000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-3" prio=10 tid=0x00007f5820412800 nid=0x16e2 waiting
> on condition [0x00007f53b4dd6000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-2" prio=10 tid=0x00007f5820410800 nid=0x16e1 waiting
> on condition [0x00007f53b4ed7000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "ProducerThread-1" prio=10 tid=0x00007f582040f000 nid=0x16e0 waiting
> on condition [0x00007f53b4fd8000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19ffb8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>         at kafka.tools.KafkaMigrationTool$ProducerDataChannel.receiveRequest(KafkaMigrationTool.java:296)
>         at kafka.tools.KafkaMigrationTool$ProducerThread.run(KafkaMigrationTool.java:387)
>
> "mirrormaker-3" prio=10 tid=0x00007f582040d800 nid=0x16df waiting on
> condition [0x00007f53b50d9000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d0fbe00> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>
> "mirrormaker-2" prio=10 tid=0x00007f582040c800 nid=0x16de waiting on
> condition [0x00007f53b51da000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d135d30> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>
> "mirrormaker-1" prio=10 tid=0x00007f582040c000 nid=0x16dd waiting on
> condition [0x00007f53b52db000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d113fb0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>
> "mirrormaker-0" prio=10 tid=0x00007f582040b800 nid=0x16dc waiting on
> condition [0x00007f53b53dc000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d186aa8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
>         at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>         at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>         at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
>         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:176)
>
> "main-EventThread" daemon prio=10 tid=0x00007f58203e9800 nid=0x16db
> waiting on condition [0x00007f53b54dd000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14cc38> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
>
> "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
> prio=10 tid=0x00007f58203e8800 nid=0x16da runnable
> [0x00007f53b55de000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>         - locked <0x00007f541d133788> (a sun.nio.ch.Util$1)
>         - locked <0x00007f541d1337a0> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x00007f541d133710> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
>
> "ZkClient-EventThread-29-mandm-zookeeper-asg.data.sfdc.net:2181
> <http://zkclient-eventthread-29-mandm-zookeeper-asg.data.sfdc.net:2181/>"
> daemon prio=10 tid=0x00007f58203b8000 nid=0x16d9 waiting on condition
> [0x00007f53b56df000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14cdc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
>
> "mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1378308858270-7a2f6553_watcher_executor"
> prio=10 tid=0x00007f5820343000 nid=0x16d7 waiting on condition
> [0x00007f53b58e1000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14cf58> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2116)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:319)
>
> "Kafka-consumer-autocommit-1" prio=10 tid=0x00007f58203aa000
> nid=0x16d6 waiting on condition [0x00007f53b59e2000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d159288> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>         at java.lang.Thread.run(Thread.java:619)
>
> "main-EventThread" daemon prio=10 tid=0x00007f5820339000 nid=0x16d5
> waiting on condition [0x00007f53b5ae3000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d158190> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)
>
> "main-SendThread(mandm-zookeeper-asg.data.sfdc.net:2181)" daemon
> prio=10 tid=0x00007f582034b000 nid=0x16d4 runnable
> [0x00007f53b5be4000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>         - locked <0x00007f541d157c10> (a sun.nio.ch.Util$1)
>         - locked <0x00007f541d157bf8> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x00007f541d157510> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1134)
>
> "ZkClient-EventThread-23-mandm-zookeeper-asg.data.sfdc.net:2181
> <http://zkclient-eventthread-23-mandm-zookeeper-asg.data.sfdc.net:2181/>"
> daemon prio=10 tid=0x00007f5820338800 nid=0x16d3 waiting on condition
> [0x00007f53b5ded000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d15d868> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
>
> "ProducerSendThread-" prio=10 tid=0x00007f5820334000 nid=0x16d1
> waiting on condition [0x00007f53b5eee000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d1a0938> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf99f890> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf99f8d8> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f5820332800 nid=0x16d0
> waiting on condition [0x00007f53b5fef000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d17d138> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf983c78> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf983cc0> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f5820331000 nid=0x16cf
> waiting on condition [0x00007f53b60f0000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d1846e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cff19708> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cff19750> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f582032f800 nid=0x16ce
> waiting on condition [0x00007f53b61f1000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d1a06f0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf982c20> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf982c68> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f582032c000 nid=0x16cd
> waiting on condition [0x00007f53b62f2000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d175c98> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf9a3350> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf9a3398> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f582032a800 nid=0x16cc
> waiting on condition [0x00007f53b63f3000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d135cd8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf984498> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf9844e0> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f58202ef800 nid=0x16cb
> waiting on condition [0x00007f53b64f4000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d19f428> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf9a2b30> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf9a2b78> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "ProducerSendThread-" prio=10 tid=0x00007f58202d4800 nid=0x16ca
> waiting on condition [0x00007f53b65f5000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d1a0990> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:65)
>         at scala.collection.immutable.Stream$.continually(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf99f070> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:782)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1060)
>         - locked <0x00007f56cf99f0b8> (a scala.collection.immutable.Stream$Cons)
>         at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1052)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:527)
>         at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f58202d6800
> nid=0x16c9 waiting on condition [0x00007f53b66f6000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d0f73f8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>         at java.lang.Thread.run(Thread.java:619)
>
> "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f5820306800
> nid=0x16c8 waiting on condition [0x00007f53b67f7000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d0f73f8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
>         at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>         at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-0" daemon prio=10 tid=0x00007f58202ab800 nid=0x16c6
> runnable [0x00007f53b69f9000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.PlainSocketImpl.socketAccept(Native Method)
>         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>         - locked <0x00007f541d0f77d0> (a java.net.SocksSocketImpl)
>         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>         at java.net.ServerSocket.accept(ServerSocket.java:421)
>         at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:34)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>         at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-6665" daemon prio=10 tid=0x00007f58202a6000 nid=0x16c5
> runnable [0x00007f53b6afa000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.PlainSocketImpl.socketAccept(Native Method)
>         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>         - locked <0x00007f541d116970> (a java.net.SocksSocketImpl)
>         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>         at java.net.ServerSocket.accept(ServerSocket.java:421)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>         at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-0" daemon prio=10 tid=0x00007f5820295000 nid=0x16c4
> runnable [0x00007f53b6bfb000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.PlainSocketImpl.socketAccept(Native Method)
>         at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390)
>         - locked <0x00007f541d167190> (a java.net.SocksSocketImpl)
>         at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>         at java.net.ServerSocket.accept(ServerSocket.java:421)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>         at java.lang.Thread.run(Thread.java:619)
>
> "Low Memory Detector" daemon prio=10 tid=0x00007f58200b8000 nid=0x16c3
> runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread1" daemon prio=10 tid=0x00007f58200b6000 nid=0x16c2
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread0" daemon prio=10 tid=0x00007f58200b3000 nid=0x16c1
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" daemon prio=10 tid=0x00007f58200b1000 nid=0x16c0
> runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" daemon prio=10 tid=0x00007f5820092800 nid=0x16bf in
> Object.wait() [0x00007f53b77f6000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00007f541d0fa430> (a java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
>         - locked <0x00007f541d0fa430> (a java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
>         at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>
> "Reference Handler" daemon prio=10 tid=0x00007f5820090800 nid=0x16be
> in Object.wait() [0x00007f53b78f7000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00007f541d11ea80> (a java.lang.ref.Reference$Lock)
>         at java.lang.Object.wait(Object.java:485)
>         at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>         - locked <0x00007f541d11ea80> (a java.lang.ref.Reference$Lock)
>
> "main" prio=10 tid=0x00007f582000a000 nid=0x16a5 waiting on condition
> [0x00007f5826bb4000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f541d14eaa8> (a
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>         at kafka.tools.MirrorMaker$MirrorMakerThread.awaitShutdown(MirrorMaker.scala:191)
>         at kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
>         at kafka.tools.MirrorMaker$$anonfun$main$9.apply(MirrorMaker.scala:159)
>         at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>         at scala.collection.immutable.List.foreach(List.scala:76)
>         at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:159)
>         at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
> "VM Thread" prio=10 tid=0x00007f582008c800 nid=0x16bd runnable
>
> "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f582001d000
> nid=0x16a6 runnable
>
> "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f582001f000
> nid=0x16a7 runnable
>
> "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f5820021000
> nid=0x16a8 runnable
>
> "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f5820022800
> nid=0x16a9 runnable
>
> "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f5820024800
> nid=0x16aa runnable
>
> "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f5820026800
> nid=0x16ab runnable
>
> "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f5820028000
> nid=0x16ac runnable
>
> "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f582002a000
> nid=0x16ad runnable
>
> "GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f582002c000
> nid=0x16ae runnable
>
> "GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f582002d800
> nid=0x16af runnable
>
> "GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f582002f800
> nid=0x16b0 runnable
>
> "GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f5820031000
> nid=0x16b1 runnable
>
> "GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f5820033000
> nid=0x16b2 runnable
>
> "GC task thread#13 (ParallelGC)" prio=10 tid=0x00007f5820035000
> nid=0x16b3 runnable
>
> "GC task thread#14 (ParallelGC)" prio=10 tid=0x00007f5820036800
> nid=0x16b4 runnable
>
> "GC task thread#15 (ParallelGC)" prio=10 tid=0x00007f5820038800
> nid=0x16b5 runnable
>
> "GC task thread#16 (ParallelGC)" prio=10 tid=0x00007f582003a800
> nid=0x16b6 runnable
>
> "GC task thread#17 (ParallelGC)" prio=10 tid=0x00007f582003c000
> nid=0x16b7 runnable
>
> "GC task thread#18 (ParallelGC)" prio=10 tid=0x00007f582003e000
> nid=0x16b8 runnable
>
> "GC task thread#19 (ParallelGC)" prio=10 tid=0x00007f5820040000
> nid=0x16b9 runnable
>
> "GC task thread#20 (ParallelGC)" prio=10 tid=0x00007f5820041800
> nid=0x16ba runnable
>
> "GC task thread#21 (ParallelGC)" prio=10 tid=0x00007f5820043800
> nid=0x16bb runnable
>
> "GC task thread#22 (ParallelGC)" prio=10 tid=0x00007f5820045800
> nid=0x16bc runnable
>
> "VM Periodic Task Thread" prio=10 tid=0x00007f58202ae000 nid=0x16c7
> waiting on condition
>
> JNI global references: 1362
>
>
>
>
>
>
> On Tue, Sep 3, 2013 at 9:24 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:
>
>> 16 GB is a very large heap. GC tuning becomes trickier as the size of the
>> heap increases. Are you sure you need that much memory to operate the
>> mirror maker? For us, the following GC settings have worked well -
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Java
>>
>> Thanks,
>> Neha
>>
>>
>> On Tue, Sep 3, 2013 at 10:40 AM, Rajasekar Elango <relango@salesforce.com
>> >wrote:
>>
>> > Thanks Neha,
>> >
>> > I did not take a thread dump before restarting, will get it when it
>> happens
>> > again. We are using 16 Gigs of jvm heap. Do you have a recommendation on
>> > jvm GC options.?
>> >
>> > Thanks,
>> > Raja.
>> >
>> >
>> > On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede <neha.narkhede@gmail.com
>> > >wrote:
>> >
>> > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
>> > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
>> (Disconnected)
>> > > 2013-09-01 05:59:27,692 [main-SendThread(
>> > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>> > >  (org.apache.zookeeper.
>> > > ClientCnxn)  - Client session timed out, have not
>> > > heard from server in 4002ms for sessionid 0x140c603da5b0032, closing
>> > socket
>> > > connection and attempting reconnect
>> > >
>> > > This indicates that your mirror maker and/or your zookeeper cluster is
>> > > GCing for long periods of time. I have observed that if "client session
>> > > timed out" happens too many times, the client tends to lose zookeeper
>> > > watches. This is a potential bug in zookeeper. If this happens, your
>> > mirror
>> > > maker instance might not rebalance correctly and will start losing
>> data.
>> > >
>> > > You mentioned consumption/production stopped on your mirror maker,
>> could
>> > > you please take a thread dump and point us to it? Meanwhile, you might
>> > want
>> > > to fix the GC pauses.
>> > >
>> > > Thanks,
>> > > Neha
>> > >
>> > >
>> > > On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango <
>> relango@salesforce.com
>> > > >wrote:
>> > >
>> > > > We found that mirrormaker stopped consuming and producing over the
>> week
>> > > end
>> > > > (09/01). Just seeing "Client session timed out" messages in
>> mirrormaker
>> > > > log. I restarted to it today 09/03 to resume processing. Here is the
>> > logs
>> > > > line in reverse order.
>> > > >
>> > > >
>> > > > 2013-09-03 14:20:40,918
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.utils.VerifiableProperties)  - Verifying properties
>> > > > 2013-09-03 14:20:40,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > > begin rebalancing consumer
>> > > > mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
>> try
>> > > #1
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > > Committing all offsets after clearing the fetcher queues
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > > Cleared the data chunks in all the consumer message iterators
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > > Cleared all relevant queues for this fetcher
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>> > > > [ConsumerFetcherManager-1378218012760] All connections stopped
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>> > > > [ConsumerFetcherManager-1378218012760] Stopping all fetchers
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ConsumerFetcherManager)  -
>> > > > [ConsumerFetcherManager-1378218012760] Stopping leader finder thread
>> > > > 2013-09-03 14:20:38,877
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > > Rebalancing attempt failed. Clearing the cache before the next
>> > > rebalancing
>> > > > operation is triggered
>> > > > 2013-09-03 14:20:38,876
>> > > >
>> > > >
>> > >
>> >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
>> > > > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
>> > > >
>> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
>> > > end
>> > > > rebalancing consumer
>> > > > mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506
>> try
>> > > #0
>> > > > 2013-09-01 05:59:29,069 [main-SendThread(
>> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>> > > >  (org.apache.zookeeper.ClientCnxn)  - Socket connection established
>> to
>> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181, initiating
>> > session
>> > > > 2013-09-01 05:59:29,069 [main-SendThread(
>> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>> > > >  (org.apache.zookeeper.ClientCnxn)  - Opening socket connection to
>> > server
>> > > > mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181
>> > > > 2013-09-01 05:59:27,792 [main-EventThread] INFO
>> > > >  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed
>> > (Disconnected)
>> > > > 2013-09-01 05:59:27,692 [main-SendThread(
>> > > > mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>> > > >  (org.apache.zookeeper.ClientCnxn)  - Client session timed out, have
>> > not
>> > > > heard from server in 4002ms for sessionid 0x140c603da5b0032, closing
>> > > socket
>> > > > connection and attempting reconnect
>> > > >
>> > > >
>> > > > As you can see, no log lines appeared after 2013-09-01 05:59:29. I
>> > > checked
>> > > > lag using consumerOffsetChecker and observed that log size and lag is
>> > > > growing, but offset of mirrormaker remains same. We have two
>> > mirrormaker
>> > > > process running and both of them had same issue during same time
>> > frame..
>> > > > Any hint on what could be problem..? How do we go about trouble
>> > shooting
>> > > > this..?
>> > > >
>> > > > Thanks in advance..
>> > > >
>> > > > --
>> > > > Thanks,
>> > > > Raja.
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Thanks,
>> > Raja.
>> >
>>
>
>
>
> --
> Thanks,
> Raja.

Mime
View raw message