samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jagadish Venkatraman <jagadish1...@gmail.com>
Subject Re: About reconnect times?
Date Tue, 25 Apr 2017 03:36:01 GMT
Hi ShuQi,

>> There are 4 brokers in our Kafka cluster, when one of the brokers goes
down, Samza can not fetch and send messages any more, is this normal?

I'm not entirely sure this is a Samza problem intrinsically. It is possible
that your topic partition was entirely offline. The Kafka producer will
refresh the metadata before each send attempt based on the *retry.backoff.ms
<http://retry.backoff.ms>* setting (default *10ms*).

In general, the following Kafka server-side properties govern the
availability versus consistency trade-off when a broker is dead:

- Replication factor
- Minimum in-sync replicas (which Kafka refers to as ISR)
- Unclean Leader election
- Acknowledgements



On Mon, Apr 24, 2017 at 6:42 PM, 舒琦 <shuqi@eefung.com> wrote:

> Hi Jagadish,
>
> Thanks for your patient explanation.
>
> I understand now about the exceptions.
>
> But there is still a question. There are 4 brokers in our Kafka cluster,
> when one of the brokers goes down, Samza can not fetch and send messages
> any more, is this normal?
>
> ————————
> Qi Shu
>
>
> > 在 2017年4月25日,01:24,Jagadish Venkatraman <jagadish1989@gmail.com>
写道:
> >
> > Hi ShuQi,
> >
> > My apologies for the late reply.
> >
> > There are 2 categories of exceptions here (both occurring presumably due
> to
> > your Kafka broker failure)
> >
> > *Producer side:*
> >
> > - This is a network exception from the *Sender* instance inside the
> > *KafkaProducer* used by Samza.
> > - The default number of retries in Samza is MAX_INT. You can configure
> > retries by over-riding: systems.system-name.producer.retries
> >
> > More generally, any Kafka property can be over-ridden as follows:
> > systems.system-name.
> > producer.* Any Kafka producer configuration
> > <http://kafka.apache.org/documentation.html#newproducerconfigs> can be
> > included here. For example, to change the request timeout, you can set
> > systems.system-name.producer.timeout.ms. (There is no need to configure
> > client.id as it is automatically configured by Samza.)
> > *Consumer-side:*
> >
> > - The exception is a timeout triggered from the
> DefaultFetchSimpleConsumer.
> > It happens in a separate thread where we poll for messages, and hence,
> > should not affect the *SamzaContainer* main thread.
> > - The default behavior is to attempt a re-connect, and then re-create the
> > Consumer instance. The number of reconnect attempts is unbounded (and not
> > configurable).
> >
> >
> > Best,
> > Jagadish
> >
> >
> > On Tue, Apr 18, 2017 at 10:51 PM, 舒琦 <shuqi@eefung.com> wrote:
> >
> >> Hi Guys,
> >>
> >>        One of brokers in Kafka cluster is going down, the samza got the
> >> following exception:
> >>
> >> 2017-04-19 10:42:36.751 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed
> at
> >> 172.19.105.20:9096 for client samza_consumer-canal_status_
> content_distinct-1]
> >> DefaultFetchSimpleConsumer [INFO] Reconnect due to error:
> >> java.net.SocketTimeoutException
> >>        at sun.nio.ch.SocketAdaptor$SocketInputStream.read(
> >> SocketAdaptor.java:211)
> >>        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:
> 103)
> >>        at java.nio.channels.Channels$ReadableByteChannelImpl.read(
> >> Channels.java:385)
> >>        at org.apache.kafka.common.network.NetworkReceive.
> >> readFromReadableChannel(NetworkReceive.java:81)
> >>        at kafka.network.BlockingChannel.readCompletely(
> >> BlockingChannel.scala:129)
> >>        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:
> >> 120)
> >>        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.
> >> scala:86)
> >>        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> >> $sendRequest(SimpleConsumer.scala:83)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> >> apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
> >>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> >> SimpleConsumer.scala:131)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> SimpleConsumer.scala:131)
> >>        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> >> SimpleConsumer.scala:131)
> >>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
> >>        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
> fetch(
> >> DefaultFetchSimpleConsumer.scala:48)
> >>        at org.apache.samza.system.kafka.DefaultFetchSimpleConsumer.
> >> defaultFetch(DefaultFetchSimpleConsumer.scala:41)
> >>        at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$
> >> system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:179)
> >>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
> >> run$1.apply(BrokerProxy.scala:147)
> >>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1$$anonfun$
> >> run$1.apply(BrokerProxy.scala:134)
> >>        at org.apache.samza.util.ExponentialSleepStrategy.run(
> >> ExponentialSleepStrategy.scala:82)
> >>        at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(
> >> BrokerProxy.scala:133)
> >>        at java.lang.Thread.run(Thread.java:745)
> >> 2017-04-19 10:42:44.507 [kafka-producer-network-thread |
> >> samza_producer-canal_status_content_distinct-1] Sender [WARN] Got error
> >> produce response with correlation id 64783117 on topic-partition
> >> tweets_distinctContent-5, retrying (2147483646 attempts left). Error:
> >> NETWORK_EXCEPTION
> >>
> >>        Does “2147483646 attempts left” mean that samza will try to
> >> reconnect to broken broker 2147483646 times?
> >>        And the log shows that samza keeps connecting to the broken
> broker
> >> and the samza cluster can’t read any new messages even if Kafka cluster
> is
> >> fault tolerance.
> >>
> >>        How can I override this property: “2147483646 attempts left”?
> >>
> >> Thanks.
> >>
> >> ————————
> >> ShuQi
> >>
> >>
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message