kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: exception processing streams ..
Date Tue, 04 Jul 2017 17:04:50 GMT
Thanks!

On Tue, Jul 4, 2017 at 10:28 PM, Damian Guy <damian.guy@gmail.com> wrote:

> Yes, System.exit(..)
> streams.close(..) just attempts to stop any running stream threads.
>
> On Tue, 4 Jul 2017 at 17:49 Debasish Ghosh <ghosh.debasish@gmail.com>
> wrote:
>
>> Hi Damien -
>>
>> Just 1 question .. by "terminate the process" do you mean System.exit(..)
>> ?
>> Because streams.close() will not terminate the process - right ?
>>
>> regards.
>>
>> On Tue, Jul 4, 2017 at 9:36 PM, Debasish Ghosh <ghosh.debasish@gmail.com>
>> wrote:
>>
>> > Hi Damian -
>> >
>> > I also thought so .. yes, I will add `KafkaStreams#setUncaughtE
>> > xceptionHandler(...)` and Mesos should restart the process .. Thanks for
>> > your prompt response ..
>> >
>> > regards.
>> >
>> > On Tue, Jul 4, 2017 at 9:30 PM, Damian Guy <damian.guy@gmail.com>
>> wrote:
>> >
>> >> Hi Debasish,
>> >>
>> >> It looks like it is possibly a bug in the Kafka Consumer code.
>> >> In your streams app you probably want to add an
>> UncaughtExceptionHandler,
>> >> i.e, via `KafkaStreams#setUncaughtExceptionHandler(...)` and terminate
>> >> the
>> >> process when you receive an uncaught exception. I guess Mesos should
>> >> automatically restart it for you, then?
>> >>
>> >> Thanks,
>> >> Damian
>> >>
>> >> On Tue, 4 Jul 2017 at 16:40 Debasish Ghosh <ghosh.debasish@gmail.com>
>> >> wrote:
>> >>
>> >> > Hi -
>> >> >
>> >> > I have been running a streaming application on some data set. Things
>> >> > usually run ok. Today I was trying to run the same application on
>> Kafka
>> >> > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster.
>> After
>> >> > running for quite some time, I got the following exception ..
>> >> >
>> >> > Exception in thread "StreamThread-1" java.lang.
>> IllegalStateException:
>> >> > > Attempt to retrieve exception from future which hasn't failed
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> >> exception(RequestFuture.java:99)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.
>> >> isRetriable(RequestFuture.java:89)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> >> tor.commitOffsetsSync(ConsumerCoordinator.java:590)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
>> >> KafkaConsumer.java:1124)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>> >> commitOffsets(StreamTask.java:296)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask$1.
>> >> run(StreamTask.java:79)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> >> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamTask.
>> >> commit(StreamTask.java:280)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> commitOne(StreamThread.java:807)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> commitAll(StreamThread.java:794)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> maybeCommit(StreamThread.java:769)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> runLoop(StreamThread.java:647)
>> >> > > at
>> >> > >
>> >> > org.apache.kafka.streams.processor.internals.StreamThread.
>> >> run(StreamThread.java:361)
>> >> >
>> >> >
>> >> > Looks like some internal processing failed and control went to an
>> >> > unexpected path. I have 2 questions ..
>> >> >
>> >> >    1. any idea why this could happen ? I don't think it's related to
>> >> Mesos
>> >> >    DC/OS though - may be some concurrency issue ?
>> >> >    2. how do I recover from such errors ? The stream processor has
>> >> stopped
>> >> >    and the only way out is to restart the application.
>> >> >
>> >> > regards.
>> >> >
>> >> > --
>> >> > Debasish Ghosh
>> >> > http://manning.com/ghosh2
>> >> > http://manning.com/ghosh
>> >> >
>> >> > Twttr: @debasishg
>> >> > Blog: http://debasishg.blogspot.com
>> >> > Code: http://github.com/debasishg
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Debasish Ghosh
>> > http://manning.com/ghosh2
>> > http://manning.com/ghosh
>> >
>> > Twttr: @debasishg
>> > Blog: http://debasishg.blogspot.com
>> > Code: http://github.com/debasishg
>> >
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message