storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Han Jing" <hanjingjing1...@gmail.com>
Subject 答复: Storm kafka Spout Stuck When Kafka leader is Down
Date Tue, 27 Mar 2018 09:14:27 GMT
When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server 1.0.0. leader down,
kafka spout went well.

But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as kafka sever version),kafka
–server 1.0.0. Kafka spout stuck when kafka leader down.

 

Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client 1.0.0/1.0.1 ???

I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and kafka-client 1.0.0/1.0.1

发件人: Ajeesh [mailto:ajeeshreloaded@gmail.com] 
发送时间: 2018年3月27日 16:52
收件人: user@storm.apache.org
主题: Re: Storm kafka Spout Stuck When Kafka leader is Down

 

Hi, Use the storm-kafka-client version same as Kafka Server version 

 

On Tue, Mar 27, 2018, 2:04 PM Han Jing <hanjingjing1213@gmail.com <mailto:hanjingjing1213@gmail.com>
> wrote:

Hi All,

       I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0, 1.0.1).When Kafka
topic leader progress is down, Storm Kafka Spout is stuck, there is no responseon UI website
,even kakfa topic leader is alter to another broker, It’s still stuck, until restart the
kafka server progress. Storm recovered from struk.

       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client 1.0.0/1.0.1?

       

       Here’s some code and Storm log.Please help me with this issue.

Thanks a lot.

       --------------------------------------------------------------

Code:

Kafka-client version is the same the kafka version(1.0.0,1.0.1).

       Kafka is distribute on 3 brokers. There are 2 replicators  and 1 partition for every
Kafka topic. 

       KafkaSpout configureateion is as below. The topology read from just one topic.

TopologyBuilder builder = new TopologyBuilder();
//kafka Servers IP
String bootstrapServers = properties.getProperty("bootstrap.servers");
//Kafka Spout consumer topic
String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
//KafkaSpout
KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers,
kafkaReaderTopic)
        .setProp(properties)
        .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
        .build();
//topology Spout,KAFKA_READER
builder.setSpout(BOLT_ID_KAFKA_READER, new KafkaSpout<>(config), 1);

       --------------------------------------------------------------

       Log:

       518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] Updated cluster metadata version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw,
nodes = [170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null), 170.0.0.39:9092
<http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>
 (id: 1 rack:        null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader
= 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])

518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1, disconnected=false, requ
      estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1,
correlationId=377894), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null)

518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0, disconnected=false, requ
      estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1,
correlationId=377896), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0, disconnected=false, requ
      estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1,
correlationId=377897), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Initiating connection
to node 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9]
[DEBUG] [Consumer clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39
<http://170.0.0.39>  disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101]

518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101]

518745         at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
~[kafka-clients-1.0.0.j       ar:?]

518746         at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
~[kafka-clients-1.0.0.jar:?]

518747         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
[kafka-clients-1.0.0.jar:?]

518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?]

518750         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
[kafka-clients-1.0.0.ja       r:?]

518751         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
[kafka-clients-1.0.0.ja       r:?]

518752         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
[kafka-clients-1.0.0.ja       r:?]

518753         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
[kafka-cli       ents-1.0.0.jar:?]

518754         at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
[kafka-clients-1.0.0.jar:?]

518755         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
[storm-kafka-client-1.2.1.jar:1.2.1]

518756         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
[storm-kafka-client-1.2.1.jar:1.2.1]

518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1]

518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
[storm-core-1.2.1.jar:1.2.1]

518759         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645 disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [WARN] [Consumer clientId=consumer-1, groupId=matc       hgrp002] Connection to node 2147483645
could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId=
      377898) with correlation id 377898 due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092>
 (id: 3 rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG]
Updated cluster metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>
 (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack:        null)],
partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr
= [1,2], offlineReplicas = [])])

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9]
[DEBUG] [Consumer clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39
<http://170.0.0.39>  disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101]

518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101]

518745         at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
~[kafka-clients-1.0.0.j       ar:?]

518746         at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
~[kafka-clients-1.0.0.jar:?]

518747         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
[kafka-clients-1.0.0.jar:?]

518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?]

518750         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
[kafka-clients-1.0.0.ja       r:?]

518751         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
[kafka-clients-1.0.0.ja       r:?]

518752         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
[kafka-clients-1.0.0.ja       r:?]

518753         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
[kafka-cli       ents-1.0.0.jar:?]

518754         at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
[kafka-clients-1.0.0.jar:?]

518755         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
[storm-kafka-client-1.2.1.jar:1.2.1]

518756         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
[storm-kafka-client-1.2.1.jar:1.2.1]

518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1]

518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
[storm-core-1.2.1.jar:1.2.1]

518759         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645 disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [WARN] [Consumer clientId=consumer-1, groupId=matc       hgrp002] Connection to node 2147483645
could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId=
      377898) with correlation id 377898 due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
(type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092>
 (id: 3 rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG]
Updated cluster metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes =
[170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>
 (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack:        null)],
partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr
= [1,2], offlineReplicas = [])])

518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0, disconnected=false, requ
      estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1,
correlationId=377900), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1, disconnected=false, requ
      estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1,
correlationId=377901), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null',
error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9
9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

 


Mime
View raw message