storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Han Jing" <hanjingjing1...@gmail.com>
Subject Storm kafka Spout Stuck When Kafka leader is Down
Date Tue, 27 Mar 2018 08:33:54 GMT
Hi All,

       I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0,
1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is stuck,
there is no responseon UI website ,even kakfa topic leader is alter to
another broker, It’s still stuck, until restart the kafka server progress.
Storm recovered from struk.

       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client 1.0.0/1.0.1?

       

       Here’s some code and Storm log.Please help me with this issue.

Thanks a lot.

       --------------------------------------------------------------

Code:

Kafka-client version is the same the kafka version(1.0.0,1.0.1).

       Kafka is distribute on 3 brokers. There are 2 replicators  and 1
partition for every Kafka topic. 

       KafkaSpout configureateion is as below. The topology read from just
one topic.

TopologyBuilder builder = new TopologyBuilder();
//kafka Servers IP
String bootstrapServers = properties.getProperty("bootstrap.servers");
//Kafka Spout consumer topic
String kafkaReaderTopic =
properties.getProperty("storm.kafka.reader.topic");
//KafkaSpout
KafkaSpoutConfig<String, String> config =
KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
        .setProp(properties)
 
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIES
T)
        .build();
//topology Spout,KAFKA_READER
builder.setSpout(BOLT_ID_KAFKA_READER, new KafkaSpout<>(config), 1);

       --------------------------------------------------------------

       Log:

       518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
170.0.0.38:9092 (id: 1 rack:        null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])

518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))

518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)

518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead

518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.46:9092 (id: 3 rack: null)

518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))

518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)

518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead

518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)

518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))

518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)

518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to
node 170.0.0.39:9092 (id: 2147483645 rack: null)

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39
disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_101]

518744         at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_101]

518745         at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(Plaint
extTransportLayer.java:50) ~[kafka-clients-1.0.0.j       ar:?]

518746         at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java
:106) ~[kafka-clients-1.0.0.jar:?]

518747         at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444
) [kafka-clients-1.0.0.jar:?]

518748         at
org.apache.kafka.common.network.Selector.poll(Selector.java:398)
[kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
java:460) [kafka-clients-1.0.0.jar:?]

518750         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:238) [kafka-clients-1.0.0.ja       r:?]

518751         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:214) [kafka-clients-1.0.0.ja       r:?]

518752         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:174) [kafka-clients-1.0.0.ja       r:?]

518753         at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommitt
edOffsets(ConsumerCoordinator.java:472) [kafka-cli       ents-1.0.0.jar:?]

518754         at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java
:1441) [kafka-clients-1.0.0.jar:?]

518755         at
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464
) [storm-kafka-client-1.2.1.jar:1.2.1]

518756         at
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.j
ava:440) [storm-kafka-client-1.2.1.jar:1.2.1]

518757         at
org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
[storm-kafka-client-1.2.1.jar:1.2.1]

518758         at
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.
clj:654) [storm-core-1.2.1.jar:1.2.1]

518759         at
org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.
2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
clientId=consumer-1, groupId=matc       hgrp002] Connection to node
2147483645 could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
clientId=consumer-1, correlationId=       377898) with correlation id 377898
due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
170.0.0.46:9092 (id: 3 rack:        null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39
disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_101]

518744         at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_101]

518745         at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(Plaint
extTransportLayer.java:50) ~[kafka-clients-1.0.0.j       ar:?]

518746         at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java
:106) ~[kafka-clients-1.0.0.jar:?]

518747         at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444
) [kafka-clients-1.0.0.jar:?]

518748         at
org.apache.kafka.common.network.Selector.poll(Selector.java:398)
[kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
java:460) [kafka-clients-1.0.0.jar:?]

518750         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:238) [kafka-clients-1.0.0.ja       r:?]

518751         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:214) [kafka-clients-1.0.0.ja       r:?]

518752         at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(Consu
merNetworkClient.java:174) [kafka-clients-1.0.0.ja       r:?]

518753         at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommitt
edOffsets(ConsumerCoordinator.java:472) [kafka-cli       ents-1.0.0.jar:?]

518754         at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java
:1441) [kafka-clients-1.0.0.jar:?]

518755         at
org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464
) [storm-kafka-client-1.2.1.jar:1.2.1]

518756         at
org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.j
ava:440) [storm-kafka-client-1.2.1.jar:1.2.1]

518757         at
org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
[storm-kafka-client-1.2.1.jar:1.2.1]

518758         at
org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.
clj:654) [storm-core-1.2.1.jar:1.2.1]

518759         at
org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.
2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
clientId=consumer-1, groupId=matc       hgrp002] Connection to node
2147483645 could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
clientId=consumer-1, correlationId=       377898) with correlation id 377898
due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
170.0.0.46:9092 (id: 3 rack:        null)], partitions = [Partition(topic =
tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
offlineReplicas = [])])

518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))

518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)

518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 (id: 2147483645 rack: null) dead

518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 (id: 1 rack: null)

518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
disconnected=false, requ
estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe
sponse(throttleTimeMs=0, errorMessage='null', error=NONE,
node=170.0.0.39:9092 (id: 2 rack: null)))

518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.
0.0.39:9092 (id: 2147483645 rack: null)

 


Mime
View raw message