kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: exception processing streams ..
Date Wed, 05 Jul 2017 23:44:33 GMT
I think Damina's finding is correct regarding the consumer bug, and there
is a PR being worked on already:
https://github.com/apache/kafka/pull/3489/files


Guozhang

On Tue, Jul 4, 2017 at 10:04 AM, Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Thanks!
>
> On Tue, Jul 4, 2017 at 10:28 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Yes, System.exit(..)
> > streams.close(..) just attempts to stop any running stream threads.
> >
> > On Tue, 4 Jul 2017 at 17:49 Debasish Ghosh <ghosh.debasish@gmail.com>
> > wrote:
> >
> >> Hi Damien -
> >>
> >> Just 1 question .. by "terminate the process" do you mean
> System.exit(..)
> >> ?
> >> Because streams.close() will not terminate the process - right ?
> >>
> >> regards.
> >>
> >> On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh <
> ghosh.debasish@gmail.com>
> >> wrote:
> >>
> >> > Hi Damian -
> >> >
> >> > I also thought so .. yes, I will add `KafkaStreams#setUncaughtE
> >> > xceptionHandler(...)` and Mesos should restart the process .. Thanks
> for
> >> > your prompt response ..
> >> >
> >> > regards.
> >> >
> >> > On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian.guy@gmail.com>
> >> wrote:
> >> >
> >> >> Hi Debasish,
> >> >>
> >> >> It looks like it is possibly a bug in the Kafka Consumer code.
> >> >> In your streams app you probably want to add an
> >> UncaughtExceptionHandler,
> >> >> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and
> terminate
> >> >> the
> >> >> process when you receive an uncaught exception. I guess Mesos should
> >> >> automatically restart it for you, then?
> >> >>
> >> >> Thanks,
> >> >> Damian
> >> >>
> >> >> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debasish@gmail.com
> >
> >> >> wrote:
> >> >>
> >> >> > Hi -
> >> >> >
> >> >> > I have been running a streaming application on some data set.
> Things
> >> >> > usually run ok. Today I was trying to run the same application
on
> >> Kafka
> >> >> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster.
> >> After
> >> >> > running for quite some time, I got the following exception ..
> >> >> >
> >> >> > Exception in thread "StreamThread-1" java.lang.
> >> IllegalStateException:
> >> >> > > Attempt to retrieve exception from future which hasn't failed
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> >> exception(RequestFuture.java:99)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
> >> >> isRetriable(RequestFuture.java:89)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> >> >> KafkaConsumer.java:1124)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> >> commitOffsets(StreamTask.java:296)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
> >> >> run(StreamTask.java:79)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamTask.
> >> >> commit(StreamTask.java:280)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> commitOne(StreamThread.java:807)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> commitAll(StreamThread.java:794)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> maybeCommit(StreamThread.java:769)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> runLoop(StreamThread.java:647)
> >> >> > > at
> >> >> > >
> >> >> > org.apache.kafka.streams.processor.internals.StreamThread.
> >> >> run(StreamThread.java:361)
> >> >> >
> >> >> >
> >> >> > Looks like some internal processing failed and control went to
an
> >> >> > unexpected path. I have 2 questions ..
> >> >> >
> >> >> >    1. any idea why this could happen ? I don't think it's related
> to
> >> >> Mesos
> >> >> >    DC/OS though - may be some concurrency issue ?
> >> >> >    2. how do I recover from such errors ? The stream processor
has
> >> >> stopped
> >> >> >    and the only way out is to restart the application.
> >> >> >
> >> >> > regards.
> >> >> >
> >> >> > --
> >> >> > Debasish Ghosh
> >> >> > http://manning.com/ghosh2
> >> >> > http://manning.com/ghosh
> >> >> >
> >> >> > Twttr: @debasishg
> >> >> > Blog: http://debasishg.blogspot.com
> >> >> > Code: http://github.com/debasishg
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Debasish Ghosh
> >> > http://manning.com/ghosh2
> >> > http://manning.com/ghosh
> >> >
> >> > Twttr: @debasishg
> >> > Blog: http://debasishg.blogspot.com
> >> > Code: http://github.com/debasishg
> >> >
> >>
> >>
> >>
> >> --
> >> Debasish Ghosh
> >> http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg
> >> Blog: http://debasishg.blogspot.com
> >> Code: http://github.com/debasishg
> >>
> >
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message