storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mitchell Rathbun (BLOOMBERG/ 731 LEX)" <mrathb...@bloomberg.net>
Subject Re: Issue killing KafkaSpout with storm-kafka-client 1.1.1
Date Wed, 28 Mar 2018 21:34:16 GMT
We can just update kafka-clients to 1.0.0. Thank you for the explanation.

From: user@storm.apache.org At: 03/28/18 16:01:09To:  user@storm.apache.org
Subject: Re: Issue killing KafkaSpout with storm-kafka-client 1.1.1

Looks like a bug in the consumer https://github.com/apache/kafka/commit/031da889bc811200da67568c5779760dcb006238.
The spout closes the consumer both when the topology is deactivated, and when the spout is
closed. For consumers in pre-1.0.0 versions the consumer close method happened to not be idempotent.
I believe both deactivate and close are called when a local cluster is shut down. When a regular
worker is shut down, I think the JVM running it is just killed, so close doesn't get called
on the spout.

If you need this fixed for older consumer versions, we could null the consumer reference after
closing it, so we don't close it more than once.

2018-03-28 19:45 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <mrathbun1@bloomberg.net>:

We are using the KafkaSpout class provided by version 1.1.1 of storm-kafka-client, along with
version 1.1.1 of Storm and version 0.10.0.0 of kafka-clients. In local mode, we start our
topology using LocalCluster's submitTopology method, and bring down the topology by calling
the killTopology method followed by the shutdown method. Every time killTopology is run, the
following occurs:

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) ~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) ~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) ~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_162]
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) ~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated

I did notice that updating just the version of kafka-clients to 1.0.1 made this issue disappear.
Also, this issue only happens in local mode, not cluster mode. Is there something wrong with
how we are bringing down the topology using LocalCluster? Or is this a known issue with version
1.1.1 of storm-kafka-client?


Mime
View raw message