kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: exception processing streams ..
Date Tue, 04 Jul 2017 16:49:15 GMT
Hi Damien -

Just 1 question .. by "terminate the process" do you mean System.exit(..) ?
Because streams.close() will not terminate the process - right ?

regards.

On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Hi Damian -
>
> I also thought so .. yes, I will add `KafkaStreams#setUncaughtE
> xceptionHandler(...)` and Mesos should restart the process .. Thanks for
> your prompt response ..
>
> regards.
>
> On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> It looks like it is possibly a bug in the Kafka Consumer code.
>> In your streams app you probably want to add an UncaughtExceptionHandler,
>> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate
>> the
>> process when you receive an uncaught exception. I guess Mesos should
>> automatically restart it for you, then?
>>
>> Thanks,
>> Damian
>>
>> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debasish@gmail.com>
>> wrote:
>>
>> > Hi -
>> >
>> > I have been running a streaming application on some data set. Things
>> > usually run ok. Today I was trying to run the same application on Kafka
>> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
>> > running for quite some time, I got the following exception ..
>> >
>> > Exception in thread "StreamThread-1" java.lang.IllegalStateException:
>> > > Attempt to retrieve exception from future which hasn't failed
>> > > at
>> > >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> exception(RequestFuture.java:99)
>> > > at
>> > >
>> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> isRetriable(RequestFuture.java:89)
>> > > at
>> > >
>> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>> > > at
>> > >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>> KafkaConsumer.java:1124)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamTask.
>> commitOffsets(StreamTask.java:296)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>> run(StreamTask.java:79)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamTask.
>> commit(StreamTask.java:280)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitOne(StreamThread.java:807)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> commitAll(StreamThread.java:794)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> maybeCommit(StreamThread.java:769)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:647)
>> > > at
>> > >
>> > org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:361)
>> >
>> >
>> > Looks like some internal processing failed and control went to an
>> > unexpected path. I have 2 questions ..
>> >
>> >    1. any idea why this could happen ? I don't think it's related to
>> Mesos
>> >    DC/OS though - may be some concurrency issue ?
>> >    2. how do I recover from such errors ? The stream processor has
>> stopped
>> >    and the only way out is to restart the application.
>> >
>> > regards.
>> >
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message