Hi all,

We are seeing an issue on one of our environments with a topology that has a Kafka Spout, when nextTuple is called, we end up getting a java.nio.ClosedChannelException.  This happens when the worker attempts to ask zookeeper for the kafka message offset inside KafkaUtils.getOffsetsBefore(…).  The stack trace is below.

In the zookeeper log, we are seeing zookeeper clean up its open connections with the reason being “EndOfStreamException: Unable to read additional data from client sessionid 0x1566ed464ec03ad, likely client has closed socket” (the client terminated the connection)

What we have tried:

* we can telnet from storm worker -> zookeeper 2181

* we can ping from zookeeper -> storm worker

* we have noticed another environment where we have this code deployed with same setup and it works as expected with no issues.

Is there some kind of zookeeper connection timeout that is potentially causing the KafkaUtils to drop the connection or is there some netty timeout settings we can change that could cause this error?  What else can cause a ClosedChannelException.


Storm Worker Stacktrace:

09.08.2016 10:50:00.405 o.a.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[stormjar.jar:1.22.0]

        at org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]

        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]

        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]

Caused by: java.nio.channels.ClosedChannelException

        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:1.22.0]

        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:1.22.0]

        ... 6 more

09.08.2016 10:50:00.410 o.a.s.d.executor [ERROR]

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[stormjar.jar:1.22.0]

        at org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]

        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]

        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]

Caused by: java.nio.channels.ClosedChannelException

        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0]

        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:1.22.0]

        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:1.22.0]

        ... 6 more

Zookeeper stack trace:

8/9/2016 11:52:07 AM2016-08-09 10:52:07,438 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client attempting to establish new session at /10.42.31.113:50348

8/9/2016 11:52:07 AM2016-08-09 10:52:07,439 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@645] - Established session 0x1566ed464ec03b0 with negotiated timeout 20000 for client /10.42.31.113:50348

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data from client sessionid 0x1566ed464ec03ad, likely client has closed socket

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /10.42.31.113:50274 which had sessionid 0x1566ed464ec03ad

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data from client sessionid 0x1566ed464ec03af, likely client has closed socket

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /10.42.31.113:50344 which had sessionid 0x1566ed464ec03af

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data from client sessionid 0x1566ed464ec03ae, likely client has closed socket

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /10.42.31.113:50346 which had sessionid 0x1566ed464ec03ae

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data from client sessionid 0x1566ed464ec03b0, likely client has closed socket

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /10.42.31.113:50348 which had sessionid 0x1566ed464ec03b0


Regards

Rama