samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Errors and hung job on broker shutdown
Date Fri, 01 May 2015 22:57:20 GMT
Hmm, it seems your snappy compressed data is corrupted and hence keep
getting rejected by the broker, hence keeping the producer blocked on
close(). Not sure how this happens as I have not seen this error ever
before (myself wrote the new Kafka producer's compression module, and have
ran it with various kinds of unit / integration test cases, but did not see
this coming)..

Guozhang

On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover <roger.hoover@gmail.com>
wrote:

> Guozhang and Yan,
>
> Thank you both for your responses.  I tried a lot of combinations and I
> think I've determined that it's new producer + snappy that causes the
> issue.
>
> It never happens with the old producer and it never happens with lz4 or no
> compression.  It only happens when a broker gets restarted (or maybe just
> shutdown).
>
> The error is not always the same.  I've noticed at least three types of
> errors on the Kafka brokers.
>
> 1) java.io.IOException: failed to read chunk
> at
>
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356)
> http://pastebin.com/NZrrEHxU
> 2) java.lang.OutOfMemoryError: Java heap space
>    at
>
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346)
> http://pastebin.com/yuxk1BjY
> 3) java.io.IOException: PARSING_ERROR(2)
>   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> http://pastebin.com/yq98Hx49
>
> I've noticed a couple different behaviors from the Samza producer/job
> A) It goes into a long retry loop where this message is logged.  I saw this
> with error #1 above.
>
> 2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
> ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253] Got
> error produce response with correlation id 4878 on topic-partition
> svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts
> left). Error: CORRUPT_MESSAGE
>
> B) The job exists with
> org.apache.kafka.common.errors.UnknownServerException (at least when run as
> ThreadJob).  I saw this with error #3 above.
>
> org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 6 to system kafka.
> org.apache.kafka.common.errors.UnknownServerException: The server
> experienced an unexpected error when processing the request
>
> This seems most likely to be a bug in the new Kafka producer.  I'll
> probably file a JIRA for that project.
>
> Thanks,
>
> Roger
>
> On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > And just to answer your first question: SIGTERM with
> > controlled.shutdown=true should be OK for bouncing the broker.
> >
> > Guozhang
> >
> > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Roger,
> > >
> > > I believe Samza 0.9.0 already uses the Java producer.
> > >
> > > Java producer's close() call will try to flush all buffered data to the
> > > brokers before completing the call. However, if some buffered data's
> > > destination partition leader is not known, the producer will block on
> > > refreshing the metadata and then retry sending.
> > >
> > > From the broker logs, it seems it does receive the producer request but
> > > failed to handle it due to "Leader not local" after the bounce:
> > >
> > > --------
> > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> > > correlation id 226 from client
> > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
> > > [sys.samza_metrics,0] failed due to Leader not local for partition
> > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> > > correlation id 45671 from client
> > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
> > > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0]
> failed
> > > due to Leader not local for partition
> > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
> > > (kafka.server.KafkaApis)
> > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> > > correlation id 12267 from client
> > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
> > > [sys.samza_metrics,0] failed due to Leader not local for partition
> > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > > --------
> > >
> > > because for these two topic-partitions (sys.samza_metrics,0 and
> > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has
> > been
> > > moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets
> > the
> > > error code from the old leader, it should refresh its metadata and get
> > the
> > > new leader as broker-1, and retry sending, but for some reason it does
> > not
> > > refresh its metadata. Without producer logs from Samza container I
> cannot
> > > further investigate the issue.
> > >
> > > Which Kafka version does Samza 0.9.0 use?
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang724@gmail.com>
> wrote:
> > >
> > >> Not sure about the Kafka side. From the Samza side, from your
> > >> description ( "does
> > >> not exit nor does it make any progress" ), I think the code is stuck
> in
> > >> producer.close
> > >> <
> > >>
> >
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> > >> >,
> > >> otherwise, it will throw SamzaException to quit the job. So maybe some
> > >> Kafka experts in this mailing list or Kafka mailing list can help
> > >>
> > >> Fang, Yan
> > >> yanfang724@gmail.com
> > >>
> > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover <roger.hoover@gmail.com
> >
> > >> wrote:
> > >>
> > >> > At error level logging, this was the only entry in the Samza log:
> > >> >
> > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2]
> offset[9129395]
> > >> > Unable to send message from TaskName-Partition 1 to system kafka
> > >> >
> > >> > Here is the log from the Kafka broker that was shutdown.
> > >> >
> > >> > http://pastebin.com/afgmLyNF
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Roger
> > >> >
> > >> >
> > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpan47@gmail.com>
> wrote:
> > >> >
> > >> > > Roger, could you paste the full log from Samza container? If
you
> can
> > >> > figure
> > >> > > out which Kafka broker the message was sent to, it would be
> helpful
> > >> if we
> > >> > > get the log from the broker as well.
> > >> > >
> > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> > roger.hoover@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > Hi,
> > >> > > >
> > >> > > > I need some help figuring out what's going on.
> > >> > > >
> > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All
the
> topics
> > >> have
> > >> > > > replication factor of 2.
> > >> > > >
> > >> > > > I'm bouncing the Kafka broker using SIGTERM (with
> > >> > > > controlled.shutdown.enable=true).  I see the Samza job log
this
> > >> message
> > >> > > and
> > >> > > > then hang (does not exit nor does it make any progress).
> > >> > > >
> > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition
> 2]
> > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send message
> from
> > >> > > > TaskName-Partition 1 to system kafka
> > >> > > >
> > >> > > > The Kafka consumer (Druid Real-Time node) on the other side
then
> > >> barfs
> > >> > on
> > >> > > > the message:
> > >> > > >
> > >> > > > Exception in thread "chief-svc-perf"
> > >> > > kafka.message.InvalidMessageException:
> > >> > > > Message is corrupt (stored crc = 1792882425, computed crc
=
> > >> 3898271689)
> > >> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> > >> > > > at
> > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
> > >> > > > at
> > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> > >> > > > at
> > >> > >
> > >>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > >> > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > >> > > > at
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
> > >> > > > at
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
> > >> > > >
> > >> > > > My questions are:
> > >> > > > 1) What is the right way to bounce a Kafka broker?
> > >> > > > 2) Is this a bug in Samza that the job hangs after producer
> > request
> > >> > > fails?
> > >> > > > Has anyone seen this?
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Roger
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message