samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 舒琦 <sh...@eefung.com>
Subject Re: About reconnect times?
Date Tue, 25 Apr 2017 03:48:58 GMT
Hi Jagadish,

Thanks for your help, I’ll check our Kafka first.

————————
舒琦
地址:长沙市岳麓区文轩路27号麓谷企业广场A4栋1单元6F
网址:http://www.eefung.com
微博:http://weibo.com/eefung
邮编:410013
电话:400-677-0986
传真:0731-88519609

> 在 2017年4月25日,11:36,Jagadish Venkatraman <jagadish1989@gmail.com> 写道:
> 
> Hi ShuQi,
> 
>>> There are 4 brokers in our Kafka cluster, when one of the brokers goes
> down, Samza can not fetch and send messages any more, is this normal?
> 
> I'm not entirely sure this is a Samza problem intrinsically. It is possible
> that your topic partition was entirely offline. The Kafka producer will
> refresh the metadata before each send attempt based on the *retry.backoff.ms
> <http://retry.backoff.ms>* setting (default *10ms*).
> 
> In general, the following Kafka server-side properties govern the
> availability versus consistency trade-off when a broker is dead:
> 
> - Replication factor
> - Minimum in-sync replicas (which Kafka refers to as ISR)
> - Unclean Leader election
> - Acknowledgements
> 
> 
> 
> On Mon, Apr 24, 2017 at 6:42 PM, 舒琦 <shuqi@eefung.com> wrote:
> 
>> Hi Jagadish,
>> 
>> Thanks for your patient explanation.
>> 
>> I understand now about the exceptions.
>> 
>> But there is still a question. There are 4 brokers in our Kafka cluster,
>> when one of the brokers goes down, Samza can not fetch and send messages
>> any more, is this normal?
>> 
>> ————————
>> Qi Shu
>> 
>> 
>>> 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1989@gmail.com>
写道:
>>> 
>>> Hi ShuQi,
>>> 
>>> My apologies for the late reply.
>>> 
>>> There are 2 categories of exceptions here (both occurring presumably due
>> to
>>> your Kafka broker failure)
>>> 
>>> *Producer side:*
>>> 
>>> - This is a network exception from the *Sender* instance inside the
>>> *KafkaProducer* used by Samza.
>>> - The default number of retries in Samza is MAX_INT. You can configure
>>> retries by over-riding: systems.system-name.producer.retries
>>> 
>>> More generally, any Kafka property can be over-ridden as follows:
>>> systems.system-name.
>>> producer.* Any Kafka producer configuration
>>> <http://kafka.apache.org/documentation.html#newproducerconfigs> can be
>>> included here. For example, to change the request timeout, you can set
>>> systems.system-name.producer.timeout.ms. (There is no need to configure
>>> client.id as it is automatically configured by Samza.)
>>> *Consumer-side:*
>>> 
>>> - The exception is a timeout triggered from the
>> DefaultFetchSimpleConsumer.
>>> It happens in a separate thread where we poll for messages, and hence,
>>> should not affect the *SamzaContainer* main thread.
>>> - The default behavior is to attempt a re-connect, and then re-create the
>>> Consumer instance. The number of reconnect attempts is unbounded (and not
>>> configurable).
>>> 
>>> 
>>> Best,
>>> Jagadish
>>> 
>>> 
>>> On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <shuqi@eefung.com> wrote:
>>> 
>>>> Hi Guys,
>>>> 
>>>>       One of brokers in Kafka cluster is going down, the samza got the
>>>> following exception:
>>>> 
>>>> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed
>> at
>>>> 172.19.105.20:9096 for client samza_consumer-canal_status_
>> content_distinct-1]
>>>> DefaultFetchSimpleConsumer [INFO] Reconnect due to error:
>>>> java.net.SocketTimeoutException
>>>>       at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
>>>> SocketAdaptor.java:211)
>>>>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:
>> 103)
>>>>       at java.nio.channels.Channels$ReadableByteChannelImpl.read(
>>>> Channels.java:385)
>>>>       at org.apache.kafka.common.network.NetworkReceive.
>>>> readFromReadableChannel(NetworkReceive.java:81)
>>>>       at kafka.network.BlockingChannel.readCompletely(
>>>> BlockingChannel.scala:129)
>>>>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
>>>> 120)
>>>>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
>>>> scala:86)
>>>>       at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
>>>> $sendRequest(SimpleConsumer.scala:83)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
>>>> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>>>>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
>>>> SimpleConsumer.scala:131)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>>> SimpleConsumer.scala:131)
>>>>       at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
>>>> SimpleConsumer.scala:131)
>>>>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>>>>       at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
>> fetch(
>>>> DefaultFetchSimpleConsumer.scala:48)
>>>>       at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
>>>> defaultFetch(DefaultFetchSimpleConsumer.scala:41)
>>>>       at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$
>>>> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
>>>>       at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
>>>> run$1.apply(BrokerProxy.scala:147)
>>>>       at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
>>>> run$1.apply(BrokerProxy.scala:134)
>>>>       at org.apache.samza.util.ExponentialSleepStrategy.run(
>>>> ExponentialSleepStrategy.scala:82)
>>>>       at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
>>>> BrokerProxy.scala:133)
>>>>       at java.lang.Thread.run(Thread.java:745)
>>>> 2017-04-19 10:42:44.507 [kafka-producer-network-thread |
>>>> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error
>>>> produce response with correlation id 64783117 on topic-partition
>>>> tweets_distinctContent-5, retrying (2147483646 attempts left). Error:
>>>> NETWORK_EXCEPTION
>>>> 
>>>>       Does “2147483646 attempts left” mean that samza will try to
>>>> reconnect to broken broker 2147483646 times?
>>>>       And the log shows that samza keeps connecting to the broken
>> broker
>>>> and the samza cluster can’t read any new messages even if Kafka cluster
>> is
>>>> fault tolerance.
>>>> 
>>>>       How can I override this property: “2147483646 attempts left”?
>>>> 
>>>> Thanks.
>>>> 
>>>> ————————
>>>> ShuQi
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Jagadish V,
>>> Graduate Student,
>>> Department of Computer Science,
>>> Stanford University
>> 
>> 
> 
> 
> -- 
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University


Mime
View raw message