storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: Issue killing KafkaSpout with storm-kafka-client 1.1.1
Date Wed, 28 Mar 2018 20:01:04 GMT
Looks like a bug in the consumer
https://github.com/apache/kafka/commit/031da889bc811200da67568c5779760dcb006238.
The spout closes the consumer both when the topology is deactivated, and
when the spout is closed. For consumers in pre-1.0.0 versions the consumer
close method happened to not be idempotent. I believe both deactivate and
close are called when a local cluster is shut down. When a regular worker
is shut down, I think the JVM running it is just killed, so close doesn't
get called on the spout.

If you need this fixed for older consumer versions, we could null the
consumer reference after closing it, so we don't close it more than once.

2018-03-28 19:45 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <
mrathbun1@bloomberg.net>:

> We are using the KafkaSpout class provided by version 1.1.1 of
> storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0
> of kafka-clients. In local mode, we start our topology using LocalCluster's
> submitTopology method, and bring down the topology by calling the
> killTopology method followed by the shutdown method. Every time
> killTopology is run, the following occurs:
>
> ERROR Slot [SLOT_1024] - Error when processing event
> java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> ensureNotClosed(KafkaConsumer.java:1416) ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_162]
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855)
> ~[storm-core-1.1.1.jar:1.1.1]
> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$mk_executor$reify__
> 4901.shutdown(executor.clj:425) ~[storm-core-1.1.1.jar:1.1.1]
> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.wor ... truncated
>
> I did notice that updating just the version of kafka-clients to 1.0.1 made
> this issue disappear. Also, this issue only happens in local mode, not
> cluster mode. Is there something wrong with how we are bringing down the
> topology using LocalCluster? Or is this a known issue with version 1.1.1 of
> storm-kafka-client?
>
>
>

Mime
View raw message