storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Girish Reddy <girishred...@gmail.com>
Subject Re: Help! ClosedChannelException in Storm Kafka Spout during KakfaUtils.getOffset()
Date Tue, 09 Aug 2016 16:04:14 GMT
Hey Rama,

I saw similar errors before when my Kafka's Zookeeper cluster or Kafka
cluster had disk issues or corrupted log. Try checking if those
environments are
fine.

-Girish.

On Tuesday, August 9, 2016, Rama Nallamilli <reaper93@gmail.com> wrote:

> Hi all,
>
> We are seeing an issue on one of our environments with a topology that has
> a Kafka Spout, when nextTuple is called, we end up getting a java.nio.ClosedChannelException.
> This happens when the worker attempts to ask zookeeper for the kafka
> message offset inside KafkaUtils.getOffsetsBefore(…).  The stack trace is
> below.
>
> In the zookeeper log, we are seeing zookeeper clean up its open
> connections with the reason being “EndOfStreamException: Unable to read
> additional data from client sessionid 0x1566ed464ec03ad, likely client has
> closed socket” (the client terminated the connection)
>
> What we have tried:
>
> * we can telnet from storm worker -> zookeeper 2181
>
> * we can ping from zookeeper -> storm worker
>
> * we have noticed another environment where we have this code deployed
> with same setup and it works as expected with no issues.
>
> Is there some kind of zookeeper connection timeout that is potentially
> causing the KafkaUtils to drop the connection or is there some netty
> timeout settings we can change that could cause this error?  What else can
> cause a ClosedChannelException.
>
>
> *Storm Worker Stacktrace:*
>
> 09.08.2016 10:50:00.405 o.a.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.nio.channels.ClosedChannelException
>
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645)
> ~[storm-core-1.0.1.jar:1.0.1]
>
>         at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
> [storm-core-1.0.1.jar:1.0.1]
>
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]
>
> Caused by: java.nio.channels.ClosedChannelException
>
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
> ~[stormjar.jar:1.22.0]
>
>         ... 6 more
>
> 09.08.2016 10:50:00.410 o.a.s.d.executor [ERROR]
>
> java.lang.RuntimeException: java.nio.channels.ClosedChannelException
>
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.daemon.executor$fn__4954$fn__4969$fn__5000.invoke(executor.clj:645)
> ~[storm-core-1.0.1.jar:1.0.1]
>
>         at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
> [storm-core-1.0.1.jar:1.0.1]
>
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_65]
>
> Caused by: java.nio.channels.ClosedChannelException
>
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:1.22.0]
>
>         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
> ~[stormjar.jar:1.22.0]
>
>         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94)
> ~[stormjar.jar:1.22.0]
>
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
> ~[stormjar.jar:1.22.0]
>
>         ... 6 more
>
> *Zookeeper stack trace:*
>
> 8/9/2016 11:52:07 AM2016-08-09 10:52:07,438 [myid:] - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client
> attempting to establish new session at /10.42.31.113:50348
>
> 8/9/2016 11:52:07 AM2016-08-09 10:52:07,439 [myid:] - INFO
> [SyncThread:0:ZooKeeperServer@645] - Established session
> 0x1566ed464ec03b0 with negotiated timeout 20000 for client /
> 10.42.31.113:50348
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> end of stream exception
>
> 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
> from client sessionid 0x1566ed464ec03ad, likely client has closed socket
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:230)
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.
> NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
>
> 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
> socket connection for client /10.42.31.113:50274 which had sessionid
> 0x1566ed464ec03ad
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,036 [myid:] - WARN
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> end of stream exception
>
> 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
> from client sessionid 0x1566ed464ec03af, likely client has closed socket
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:230)
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.
> NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
>
> 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
> socket connection for client /10.42.31.113:50344 which had sessionid
> 0x1566ed464ec03af
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> end of stream exception
>
> 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
> from client sessionid 0x1566ed464ec03ae, likely client has closed socket
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:230)
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.
> NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
>
> 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
> socket connection for client /10.42.31.113:50346 which had sessionid
> 0x1566ed464ec03ae
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - WARN
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> end of stream exception
>
> 8/9/2016 11:52:09 AMEndOfStreamException: Unable to read additional data
> from client sessionid 0x1566ed464ec03b0, likely client has closed socket
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.NIOServerCnxn.doIO(
> NIOServerCnxn.java:230)
>
> 8/9/2016 11:52:09 AM at org.apache.zookeeper.server.
> NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
>
> 8/9/2016 11:52:09 AM at java.lang.Thread.run(Thread.java:745)
>
> 8/9/2016 11:52:09 AM2016-08-09 10:52:09,037 [myid:] - INFO
> [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed
> socket connection for client /10.42.31.113:50348 which had sessionid
> 0x1566ed464ec03b0
>
>
> Regards
>
> Rama
>

Mime
View raw message