kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: Kafka Stream stops polling new messages
Date Tue, 09 May 2017 11:36:34 GMT
Yeah that's a good point, I'm not taking action then.

Eno

On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Hey,
>
> I am not against opening a JIRA, but I am wondering what we should
> describe/report there. If I understand the scenario correctly, João uses
> a custom RocksDB store and calls seek() in user code land. As it is a
> bug in RocksDB that seek takes so long, I am not sure what we could
> improve within Streams to prevent this?  The only thing I am seeing
> right now is that we could reduce `max.poll.interval.ms` that we just
> increased to guard against failure for long stat recreation phases.
>
> Any thoughts?
>
>
> -Matthias
>
>
> On 5/3/17 8:48 AM, João Peixoto wrote:
> > That'd be great as I'm not familiar with the protocol there
> > On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> >> Cool, thanks, shall we open a JIRA?
> >>
> >> Eno
> >>> On 3 May 2017, at 16:16, João Peixoto <joao.hartimer@gmail.com> wrote:
> >>>
> >>> Actually I need to apologize, I pasted the wrong issue, I meant to
> paste
> >>> https://github.com/facebook/rocksdb/issues/261.
> >>>
> >>> RocksDB did not produce a crash report since it didn't actually crash.
> I
> >>> performed thread dumps on stale and not-stale instances which revealed
> >> the
> >>> common behavior and I collect and plot several Kafka metrics, including
> >>> "punctuate" durations, therefore I know it took a long time and
> >> eventually
> >>> finished.
> >>>
> >>> Joao
> >>>
> >>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.thereska@gmail.com>
> >> wrote:
> >>>
> >>>> Hi there,
> >>>>
> >>>> Thanks for double checking. Does RocksDB actually crash or produce a
> >> crash
> >>>> dump? I’m curious how you know that the issue is
> >>>> https://github.com/facebook/rocksdb/issues/1121 <
> >>>> https://github.com/facebook/rocksdb/issues/1121>, so just double
> >> checking
> >>>> with you.
> >>>>
> >>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste
of
> >> the
> >>>> below should suffice)? Alternatively let us know and we’ll open it.
> >> Sounds
> >>>> like we should handle this better.
> >>>>
> >>>> Thanks,
> >>>> Eno
> >>>>
> >>>>
> >>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.hartimer@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> I believe I found the root cause of my problem. I seem to have hit
> this
> >>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121
> >>>>>
> >>>>> On my stream configuration I have a custom transformer used for
> >>>>> deduplicating records, highly inspired in the
> >>>>> EventDeduplicationLambdaIntegrationTest
> >>>>> <
> >>>>
> >> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> EventDeduplicationLambdaIntegrationTest.java#L161
> >>>>>
> >>>>> but
> >>>>> adjusted to my use case, special emphasis on the "punctuate" method.
> >>>>>
> >>>>> All the stale instances had the main stream thread "RUNNING" the
> >>>>> "punctuate" method of this transformer, which in term was running
> >> RocksDB
> >>>>> "seekToFirst".
> >>>>>
> >>>>> Also during my debugging one such instance finished the "punctuate"
> >>>> method,
> >>>>> which took ~11h, exactly the time the instance was stuck for.
> >>>>> Changing the backing state store from "persistent" to "inMemory"
> solved
> >>>> my
> >>>>> issue, at least after several days running, no stuck instances.
> >>>>>
> >>>>> This leads me to ask, shouldn't Kafka detect such a situation fairly
> >>>>> quickly? Instead of just stopping polling? My guess is that the
> >> heartbeat
> >>>>> thread which now is separate continues working fine, since by
> >> definition
> >>>>> the stream runs a message through the whole pipeline this step
> probably
> >>>>> just looked like it was VERY slow. Not sure what the best approach
> here
> >>>>> would be.
> >>>>>
> >>>>> PS The linked code clearly states "This code is for demonstration
> >>>> purposes
> >>>>> and was not tested for production usage" so that's on me
> >>>>>
> >>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Did you check the logs? Maybe you need to increase log level
to
> DEBUG
> >> to
> >>>>>> get some more information.
> >>>>>>
> >>>>>> Did you double check committed offsets via
> >> bin/kafka-consumer-groups.sh?
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/28/17 9:22 AM, João Peixoto wrote:
> >>>>>>> My stream gets stale after a while and it simply does not
receive
> any
> >>>> new
> >>>>>>> messages, aka does not poll.
> >>>>>>>
> >>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0)
and
> the
> >>>>>>> brokers are running 0.10.1.1.
> >>>>>>>
> >>>>>>> The stream state is RUNNING and there are no exceptions
in the
> logs.
> >>>>>>>
> >>>>>>> Looking at the JMX metrics, the threads are there and running,
just
> >> not
> >>>>>>> doing anything.
> >>>>>>> The metric "consumer-coordinator-metrics >
> >> heartbeat-response-time-max"
> >>>>>>> (The max time taken to receive a response to a heartbeat
request)
> >> reads
> >>>>>>> 43,361 seconds (almost 12 hours) which is consistent with
the time
> of
> >>>> the
> >>>>>>> hang. Shouldn't this trigger a failure somehow?
> >>>>>>>
> >>>>>>> The stream configuration looks something like this:
> >>>>>>>
> >>>>>>> Properties props = new Properties();
> >>>>>>>   props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> >>>>>>>             CustomTimestampExtractor.class.getName());
> >>>>>>>   props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> >>>>>>>   props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
> >>>>>>>   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>>>> myConfig.getBrokerList());
> >>>>>>>   props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>>>> Serdes.String().getClass().getName());
> >>>>>>>   props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>>>> Serdes.ByteArray().getClass().getName());
> >>>>>>>   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >>>>>>> myConfig.getCommitIntervalMs()); // 5000
> >>>>>>>   props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
> "DEBUG");
> >>>>>>>   props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> >>>>>>> myConfig.getStreamThreadsCount()); // 1
> >>>>>>>   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> >>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L
> >>>>>>>   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>>>>   props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
> >>>>>>>
> >>>>>>> The stream LEFT JOINs 2 topics, one of them being a KTable,
and
> >> outputs
> >>>>>> to
> >>>>>>> another topic.
> >>>>>>>
> >>>>>>> Thanks in advance for the help!
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message