samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jagadish Venkatraman <jagadish1...@gmail.com>
Subject Re: About reconnect times?
Date Mon, 24 Apr 2017 17:24:17 GMT
Hi ShuQi,

My apologies for the late reply.

There are 2 categories of exceptions here (both occurring presumably due to
your Kafka broker failure)

*Producer side:*

- This is a network exception from the *Sender* instance inside the
*KafkaProducer* used by Samza.
- The default number of retries in Samza is MAX_INT. You can configure
retries by over-riding: systems.system-name.producer.retries

More generally, any Kafka property can be over-ridden as follows:
systems.system-name.
producer.* Any Kafka producer configuration
<http://kafka.apache.org/documentation.html#newproducerconfigs> can be
included here. For example, to change the request timeout, you can set
systems.system-name.producer.timeout.ms. (There is no need to configure
client.id as it is automatically configured by Samza.)
*Consumer-side:*

- The exception is a timeout triggered from the DefaultFetchSimpleConsumer.
It happens in a separate thread where we poll for messages, and hence,
should not affect the *SamzaContainer* main thread.
- The default behavior is to attempt a re-connect, and then re-create the
Consumer instance. The number of reconnect attempts is unbounded (and not
configurable).


Best,
Jagadish


On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <shuqi@eefung.com> wrote:

> Hi Guys,
>
>         One of brokers in Kafka cluster is going down, the samza got the
> following exception:
>
> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> 172.19.105.20:9096 for client samza_consumer-canal_status_content_distinct-1]
> DefaultFetchSimpleConsumer [INFO] Reconnect due to error:
> java.net.SocketTimeoutException
>         at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
> SocketAdaptor.java:211)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(
> Channels.java:385)
>         at org.apache.kafka.common.network.NetworkReceive.
> readFromReadableChannel(NetworkReceive.java:81)
>         at kafka.network.BlockingChannel.readCompletely(
> BlockingChannel.scala:129)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
> 120)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
> scala:86)
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:83)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:131)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:131)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:131)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.fetch(
> DefaultFetchSimpleConsumer.scala:48)
>         at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
> defaultFetch(DefaultFetchSimpleConsumer.scala:41)
>         at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$
> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
> run$1.apply(BrokerProxy.scala:147)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
> run$1.apply(BrokerProxy.scala:134)
>         at org.apache.samza.util.ExponentialSleepStrategy.run(
> ExponentialSleepStrategy.scala:82)
>         at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
> BrokerProxy.scala:133)
>         at java.lang.Thread.run(Thread.java:745)
> 2017-04-19 10:42:44.507 [kafka-producer-network-thread |
> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error
> produce response with correlation id 64783117 on topic-partition
> tweets_distinctContent-5, retrying (2147483646 attempts left). Error:
> NETWORK_EXCEPTION
>
>         Does “2147483646 attempts left” mean that samza will try to
> reconnect to broken broker 2147483646 times?
>         And the log shows that samza keeps connecting to the broken broker
> and the samza cluster can’t read any new messages even if Kafka cluster is
> fault tolerance.
>
>         How can I override this property: “2147483646 attempts left”?
>
> Thanks.
>
> ————————
> ShuQi
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message