storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mitchell Rathbun (BLOOMBERG/ 731 LEX)" <mrathb...@bloomberg.net>
Subject Issue killing KafkaSpout with storm-kafka-client 1.1.1
Date Wed, 28 Mar 2018 17:45:19 GMT
We are using the KafkaSpout class provided by version 1.1.1 of storm-kafka-client, along with
version 1.1.1 of Storm and version 0.10.0.0 of kafka-clients. In local mode, we start our
topology using LocalCluster's submitTopology method, and bring down the topology by calling
the killTopology method followed by the shutdown method. Every time killTopology is run, the
following occurs:

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416)
~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) ~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) ~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) ~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_162]
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) ~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425)
~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated

I did notice that updating just the version of kafka-clients to 1.0.1 made this issue disappear.
Also, this issue only happens in local mode, not cluster mode. Is there something wrong with
how we are bringing down the topology using LocalCluster? Or is this a known issue with version
1.1.1 of storm-kafka-client?


Mime
View raw message