storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rama Nallamilli <reape...@gmail.com>
Subject Help! ClosedChannelException in Storm Kafka Spout during KakfaUtils.getOffset()
Date Tue, 09 Aug 2016 11:01:59 GMT
Hi all,

We are seeing an issue on one of our environments with a topology that has
a Kafka Spout, when nextTuple is called, we end up getting a
java.nio.ClosedChannelException.  This happens when the worker attempts to
ask zookeeper for the kafka message offset inside
KafkaUtils.getOffsetsBefore(…).  The stack trace is below.

In the zookeeper log, we are seeing zookeeper clean up its open connections
with the reason being “EndOfStreamException: Unable to read additional data
from client sessionid 0x1566ed464ec03ad, likely client has closed socket” (the
client terminated the connection)

What we have tried:

* we can telnet from storm worker -> zookeeper 2181

* we can ping from zookeeper -> storm worker

* we have noticed another environment where we have this code deployed with
same setup and it works as expected with no issues.

Is there some kind of zookeeper connection timeout that is potentially
causing the KafkaUtils to drop the connection or is there some netty
timeout settings we can change that could cause this error?  What else can
cause a ClosedChannelException.


*Storm Worker Stacktrace:*

09.08.2016 10:50:00.405 o.a.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

        at
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645)
~[storm-core-1.0.1.jar:1.0.1]

        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
[storm-core-1.0.1.jar:1.0.1]

        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]

Caused by: java.nio.channels.ClosedChannelException

        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
~[stormjar.jar:1.22.0]

        at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
~[stormjar.jar:1.22.0]

        ... 6 more

09.08.2016 10:50:00.410 o.a.s.d.executor [ERROR]

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

        at
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645)
~[storm-core-1.0.1.jar:1.0.1]

        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
[storm-core-1.0.1.jar:1.0.1]

        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]

Caused by: java.nio.channels.ClosedChannelException

        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
~[stormjar.jar:1.22.0]

        at
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
~[stormjar.jar:1.22.0]

        at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75)
~[stormjar.jar:1.22.0]

        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94)
~[stormjar.jar:1.22.0]

        at
org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
~[stormjar.jar:1.22.0]

        ... 6 more

*Zookeeper stack trace:*

8/9/2016 11:52:07 AM2016-08-09 10:52:07,438 [myid:] - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client
attempting to establish new session at /10.42.31.113:50348

8/9/2016 11:52:07 AM2016-08-09 10:52:07,439 [myid:] - INFO
[SyncThread:0:ZooKeeperServer@645] - Established session 0x1566ed464ec03b0
with negotiated timeout 20000 for client /10.42.31.113:50348

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end
of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
from client sessionid 0x1566ed464ec03ad, likely client has closed socket

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
socket connection for client /10.42.31.113:50274 which had sessionid
0x1566ed464ec03ad

8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end
of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
from client sessionid 0x1566ed464ec03af, likely client has closed socket

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
socket connection for client /10.42.31.113:50344 which had sessionid
0x1566ed464ec03af

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end
of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
from client sessionid 0x1566ed464ec03ae, likely client has closed socket

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
socket connection for client /10.42.31.113:50346 which had sessionid
0x1566ed464ec03ae

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end
of stream exception

8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
from client sessionid 0x1566ed464ec03b0, likely client has closed socket

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230)

8/9/2016 11:52:09 AM at
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)

8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)

8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
socket connection for client /10.42.31.113:50348 which had sessionid
0x1566ed464ec03b0


Regards

Rama

Mime
View raw message