samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 舒琦 <sh...@eefung.com>
Subject Re: About reconnect times?
Date Tue, 25 Apr 2017 01:42:59 GMT
Hi Jagadish,

Thanks for your patient explanation.

I understand now about the exceptions.

But there is still a question. There are 4 brokers in our Kafka cluster, when one of the brokers
goes down, Samza can not fetch and send messages any more, is this normal?

————————
Qi Shu


> 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1989@gmail.com> 写道:
> 
> Hi ShuQi,
> 
> My apologies for the late reply.
> 
> There are 2 categories of exceptions here (both occurring presumably due to
> your Kafka broker failure)
> 
> *Producer side:*
> 
> - This is a network exception from the *Sender* instance inside the
> *KafkaProducer* used by Samza.
> - The default number of retries in Samza is MAX_INT. You can configure
> retries by over-riding: systems.system-name.producer.retries
> 
> More generally, any Kafka property can be over-ridden as follows:
> systems.system-name.
> producer.* Any Kafka producer configuration
> <http://kafka.apache.org/documentation.html#newproducerconfigs> can be
> included here. For example, to change the request timeout, you can set
> systems.system-name.producer.timeout.ms. (There is no need to configure
> client.id as it is automatically configured by Samza.)
> *Consumer-side:*
> 
> - The exception is a timeout triggered from the DefaultFetchSimpleConsumer.
> It happens in a separate thread where we poll for messages, and hence,
> should not affect the *SamzaContainer* main thread.
> - The default behavior is to attempt a re-connect, and then re-create the
> Consumer instance. The number of reconnect attempts is unbounded (and not
> configurable).
> 
> 
> Best,
> Jagadish
> 
> 
> On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <shuqi@eefung.com> wrote:
> 
>> Hi Guys,
>> 
>>        One of brokers in Kafka cluster is going down, the samza got the
>> following exception:
>> 
>> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
>> 172.19.105.20:9096 for client samza_consumer-canal_status_content_distinct-1]
>> DefaultFetchSimpleConsumer [INFO] Reconnect due to error:
>> java.net.SocketTimeoutException
>>        at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
>> SocketAdaptor.java:211)
>>        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>>        at java.nio.channels.Channels$ReadableByteChannelImpl.read(
>> Channels.java:385)
>>        at org.apache.kafka.common.network.NetworkReceive.
>> readFromReadableChannel(NetworkReceive.java:81)
>>        at kafka.network.BlockingChannel.readCompletely(
>> BlockingChannel.scala:129)
>>        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
>> 120)
>>        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
>> scala:86)
>>        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>> $sendRequest(SimpleConsumer.scala:83)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>> SimpleConsumer.scala:131)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:131)
>>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>> SimpleConsumer.scala:131)
>>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>>        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(
>> DefaultFetchSimpleConsumer.scala:48)
>>        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
>> defaultFetch(DefaultFetchSimpleConsumer.scala:41)
>>        at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$
>> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
>>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
>> run$1.apply(BrokerProxy.scala:147)
>>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
>> run$1.apply(BrokerProxy.scala:134)
>>        at org.apache.samza.util.ExponentialSleepStrategy.run(
>> ExponentialSleepStrategy.scala:82)
>>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
>> BrokerProxy.scala:133)
>>        at java.lang.Thread.run(Thread.java:745)
>> 2017-04-19 10:42:44.507 [kafka-producer-network-thread |
>> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error
>> produce response with correlation id 64783117 on topic-partition
>> tweets_distinctContent-5, retrying (2147483646 attempts left). Error:
>> NETWORK_EXCEPTION
>> 
>>        Does “2147483646 attempts left” mean that samza will try to
>> reconnect to broken broker 2147483646 times?
>>        And the log shows that samza keeps connecting to the broken broker
>> and the samza cluster can’t read any new messages even if Kafka cluster is
>> fault tolerance.
>> 
>>        How can I override this property: “2147483646 attempts left”?
>> 
>> Thanks.
>> 
>> ————————
>> ShuQi
>> 
>> 
> 
> 
> -- 
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


Mime
View raw message