samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Errors and hung job on broker shutdown
Date Sun, 03 May 2015 23:37:41 GMT
Garry,

Just wondering, does this error not exist with Gzip compression? Or you
could see it with any compression schemes?

Guozhang

On Sun, May 3, 2015 at 2:32 AM, Garry Turkington <
g.turkington@improvedigital.com> wrote:

> Hi,
>
> Just to add another data point, I've been occasionally seeing the first
> error with a non-Samza app using the new Kafka producer with Snappy
> compression. I was going to post to the Kafka list but I haven't really
> narrowed down the situations yet. It sort of looks like it most often
> happens to me some minutes after a broker has restarted or had its ZK
> session time out in periods of very heavy load. But I need do more
> troubleshooting to have something less  vague to report over there.
>
> Garry
>
> -----Original Message-----
> From: Guozhang Wang [mailto:wangguoz@gmail.com]
> Sent: 01 May 2015 23:57
> To: dev@samza.apache.org
> Subject: Re: Errors and hung job on broker shutdown
>
> Hmm, it seems your snappy compressed data is corrupted and hence keep
> getting rejected by the broker, hence keeping the producer blocked on
> close(). Not sure how this happens as I have not seen this error ever
> before (myself wrote the new Kafka producer's compression module, and have
> ran it with various kinds of unit / integration test cases, but did not see
> this coming)..
>
> Guozhang
>
> On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover <roger.hoover@gmail.com>
> wrote:
>
> > Guozhang and Yan,
> >
> > Thank you both for your responses.  I tried a lot of combinations and
> > I think I've determined that it's new producer + snappy that causes
> > the issue.
> >
> > It never happens with the old producer and it never happens with lz4
> > or no compression.  It only happens when a broker gets restarted (or
> > maybe just shutdown).
> >
> > The error is not always the same.  I've noticed at least three types
> > of errors on the Kafka brokers.
> >
> > 1) java.io.IOException: failed to read chunk at
> >
> > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.jav
> > a:356)
> > http://pastebin.com/NZrrEHxU
> > 2) java.lang.OutOfMemoryError: Java heap space
> >    at
> >
> > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.jav
> > a:346)
> > http://pastebin.com/yuxk1BjY
> > 3) java.io.IOException: PARSING_ERROR(2)
> >   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> > http://pastebin.com/yq98Hx49
> >
> > I've noticed a couple different behaviors from the Samza producer/job
> > A) It goes into a long retry loop where this message is logged.  I saw
> > this with error #1 above.
> >
> > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253]
> > Got error produce response with correlation id 4878 on topic-partition
> > svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646
> > attempts left). Error: CORRUPT_MESSAGE
> >
> > B) The job exists with
> > org.apache.kafka.common.errors.UnknownServerException (at least when
> > run as ThreadJob).  I saw this with error #3 above.
> >
> > org.apache.samza.SamzaException: Unable to send message from
> > TaskName-Partition 6 to system kafka.
> > org.apache.kafka.common.errors.UnknownServerException: The server
> > experienced an unexpected error when processing the request
> >
> > This seems most likely to be a bug in the new Kafka producer.  I'll
> > probably file a JIRA for that project.
> >
> > Thanks,
> >
> > Roger
> >
> > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > And just to answer your first question: SIGTERM with
> > > controlled.shutdown=true should be OK for bouncing the broker.
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Roger,
> > > >
> > > > I believe Samza 0.9.0 already uses the Java producer.
> > > >
> > > > Java producer's close() call will try to flush all buffered data
> > > > to the brokers before completing the call. However, if some
> > > > buffered data's destination partition leader is not known, the
> > > > producer will block on refreshing the metadata and then retry
> sending.
> > > >
> > > > From the broker logs, it seems it does receive the producer
> > > > request but failed to handle it due to "Leader not local" after the
> bounce:
> > > >
> > > > --------
> > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> > > > correlation id 226 from client
> > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on
> > > > partition [sys.samza_metrics,0] failed due to Leader not local for
> > > > partition [sys.samza_metrics,0] on broker 0
> > > > (kafka.server.KafkaApis)
> > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> > > > correlation id 45671 from client
> > > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
> > > > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0]
> > failed
> > > > due to Leader not local for partition
> > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker
> > > > 0
> > > > (kafka.server.KafkaApis)
> > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> > > > correlation id 12267 from client
> > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
> > > > [sys.samza_metrics,0] failed due to Leader not local for partition
> > > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > > > --------
> > > >
> > > > because for these two topic-partitions (sys.samza_metrics,0 and
> > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead
> > > > has
> > > been
> > > > moved to broker id:1,host:sit320w80m7,port:9092. When the producer
> > > > gets
> > > the
> > > > error code from the old leader, it should refresh its metadata and
> > > > get
> > > the
> > > > new leader as broker-1, and retry sending, but for some reason it
> > > > does
> > > not
> > > > refresh its metadata. Without producer logs from Samza container I
> > cannot
> > > > further investigate the issue.
> > > >
> > > > Which Kafka version does Samza 0.9.0 use?
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang724@gmail.com>
> > wrote:
> > > >
> > > >> Not sure about the Kafka side. From the Samza side, from your
> > > >> description ( "does not exit nor does it make any progress" ), I
> > > >> think the code is stuck
> > in
> > > >> producer.close
> > > >> <
> > > >>
> > >
> > https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala
> > /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> > > >> >,
> > > >> otherwise, it will throw SamzaException to quit the job. So maybe
> > > >> some Kafka experts in this mailing list or Kafka mailing list can
> > > >> help
> > > >>
> > > >> Fang, Yan
> > > >> yanfang724@gmail.com
> > > >>
> > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover
> > > >> <roger.hoover@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > At error level logging, this was the only entry in the Samza
log:
> > > >> >
> > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition
> > > >> > 2] ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2]
> > offset[9129395]
> > > >> > Unable to send message from TaskName-Partition 1 to system
> > > >> > kafka
> > > >> >
> > > >> > Here is the log from the Kafka broker that was shutdown.
> > > >> >
> > > >> > http://pastebin.com/afgmLyNF
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Roger
> > > >> >
> > > >> >
> > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpan47@gmail.com>
> > wrote:
> > > >> >
> > > >> > > Roger, could you paste the full log from Samza container?
If
> > > >> > > you
> > can
> > > >> > figure
> > > >> > > out which Kafka broker the message was sent to, it would
be
> > helpful
> > > >> if we
> > > >> > > get the log from the broker as well.
> > > >> > >
> > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> > > roger.hoover@gmail.com
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > I need some help figuring out what's going on.
> > > >> > > >
> > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.
 All the
> > topics
> > > >> have
> > > >> > > > replication factor of 2.
> > > >> > > >
> > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with
> > > >> > > > controlled.shutdown.enable=true).  I see the Samza
job log
> > > >> > > > this
> > > >> message
> > > >> > > and
> > > >> > > > then hang (does not exit nor does it make any progress).
> > > >> > > >
> > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR]
> > > >> > > > task[Partition
> > 2]
> > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send
> > > >> > > > message
> > from
> > > >> > > > TaskName-Partition 1 to system kafka
> > > >> > > >
> > > >> > > > The Kafka consumer (Druid Real-Time node) on the other
side
> > > >> > > > then
> > > >> barfs
> > > >> > on
> > > >> > > > the message:
> > > >> > > >
> > > >> > > > Exception in thread "chief-svc-perf"
> > > >> > > kafka.message.InvalidMessageException:
> > > >> > > > Message is corrupt (stored crc = 1792882425, computed
crc =
> > > >> 3898271689)
> > > >> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> > > >> > > > at
> > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:1
> > > >> 01)
> > > >> > > > at
> > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:3
> > > >> 3)
> > > >> > > > at
> > > >> > >
> > > >>
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:6
> > 6)
> > > >> > > > at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > > >> > > > at
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEight
> > FirehoseFactory.java:106)
> > > >> > > > at
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManage
> > r.java:234)
> > > >> > > >
> > > >> > > > My questions are:
> > > >> > > > 1) What is the right way to bounce a Kafka broker?
> > > >> > > > 2) Is this a bug in Samza that the job hangs after
producer
> > > request
> > > >> > > fails?
> > > >> > > > Has anyone seen this?
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > >
> > > >> > > > Roger
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message