kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: exception processing streams ..
Date Tue, 04 Jul 2017 16:00:14 GMT
Hi Debasish,

It looks like it is possibly a bug in the Kafka Consumer code.
In your streams app you probably want to add an UncaughtExceptionHandler,
i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate the
process when you receive an uncaught exception. I guess Mesos should
automatically restart it for you, then?

Thanks,
Damian

On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debasish@gmail.com> wrote:

> Hi -
>
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
>
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>
>
> Looks like some internal processing failed and control went to an
> unexpected path. I have 2 questions ..
>
>    1. any idea why this could happen ? I don't think it's related to Mesos
>    DC/OS though - may be some concurrency issue ?
>    2. how do I recover from such errors ? The stream processor has stopped
>    and the only way out is to restart the application.
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message