kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Re: Kafka Stream stops polling new messages
Date Wed, 03 May 2017 15:48:24 GMT
That'd be great as I'm not familiar with the protocol there
On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.thereska@gmail.com> wrote:

> Cool, thanks, shall we open a JIRA?
>
> Eno
> > On 3 May 2017, at 16:16, João Peixoto <joao.hartimer@gmail.com> wrote:
> >
> > Actually I need to apologize, I pasted the wrong issue, I meant to paste
> > https://github.com/facebook/rocksdb/issues/261.
> >
> > RocksDB did not produce a crash report since it didn't actually crash. I
> > performed thread dumps on stale and not-stale instances which revealed
> the
> > common behavior and I collect and plot several Kafka metrics, including
> > "punctuate" durations, therefore I know it took a long time and
> eventually
> > finished.
> >
> > Joao
> >
> > On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> >> Hi there,
> >>
> >> Thanks for double checking. Does RocksDB actually crash or produce a
> crash
> >> dump? I’m curious how you know that the issue is
> >> https://github.com/facebook/rocksdb/issues/1121 <
> >> https://github.com/facebook/rocksdb/issues/1121>, so just double
> checking
> >> with you.
> >>
> >> If that’s indeed the case, do you mind opening a JIRA (a copy-paste of
> the
> >> below should suffice)? Alternatively let us know and we’ll open it.
> Sounds
> >> like we should handle this better.
> >>
> >> Thanks,
> >> Eno
> >>
> >>
> >>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.hartimer@gmail.com>
> >> wrote:
> >>>
> >>> I believe I found the root cause of my problem. I seem to have hit this
> >>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121
> >>>
> >>> On my stream configuration I have a custom transformer used for
> >>> deduplicating records, highly inspired in the
> >>> EventDeduplicationLambdaIntegrationTest
> >>> <
> >>
> https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java#L161
> >>>
> >>> but
> >>> adjusted to my use case, special emphasis on the "punctuate" method.
> >>>
> >>> All the stale instances had the main stream thread "RUNNING" the
> >>> "punctuate" method of this transformer, which in term was running
> RocksDB
> >>> "seekToFirst".
> >>>
> >>> Also during my debugging one such instance finished the "punctuate"
> >> method,
> >>> which took ~11h, exactly the time the instance was stuck for.
> >>> Changing the backing state store from "persistent" to "inMemory" solved
> >> my
> >>> issue, at least after several days running, no stuck instances.
> >>>
> >>> This leads me to ask, shouldn't Kafka detect such a situation fairly
> >>> quickly? Instead of just stopping polling? My guess is that the
> heartbeat
> >>> thread which now is separate continues working fine, since by
> definition
> >>> the stream runs a message through the whole pipeline this step probably
> >>> just looked like it was VERY slow. Not sure what the best approach here
> >>> would be.
> >>>
> >>> PS The linked code clearly states "This code is for demonstration
> >> purposes
> >>> and was not tested for production usage" so that's on me
> >>>
> >>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <matthias@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Did you check the logs? Maybe you need to increase log level to DEBUG
> to
> >>>> get some more information.
> >>>>
> >>>> Did you double check committed offsets via
> bin/kafka-consumer-groups.sh?
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 4/28/17 9:22 AM, João Peixoto wrote:
> >>>>> My stream gets stale after a while and it simply does not receive
any
> >> new
> >>>>> messages, aka does not poll.
> >>>>>
> >>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and
the
> >>>>> brokers are running 0.10.1.1.
> >>>>>
> >>>>> The stream state is RUNNING and there are no exceptions in the logs.
> >>>>>
> >>>>> Looking at the JMX metrics, the threads are there and running, just
> not
> >>>>> doing anything.
> >>>>> The metric "consumer-coordinator-metrics >
> heartbeat-response-time-max"
> >>>>> (The max time taken to receive a response to a heartbeat request)
> reads
> >>>>> 43,361 seconds (almost 12 hours) which is consistent with the time
of
> >> the
> >>>>> hang. Shouldn't this trigger a failure somehow?
> >>>>>
> >>>>> The stream configuration looks something like this:
> >>>>>
> >>>>> Properties props = new Properties();
> >>>>>   props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> >>>>>             CustomTimestampExtractor.class.getName());
> >>>>>   props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> >>>>>   props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
> >>>>>   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>>>> myConfig.getBrokerList());
> >>>>>   props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>>>> Serdes.String().getClass().getName());
> >>>>>   props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>>>> Serdes.ByteArray().getClass().getName());
> >>>>>   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> >>>>> myConfig.getCommitIntervalMs()); // 5000
> >>>>>   props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
> >>>>>   props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> >>>>> myConfig.getStreamThreadsCount()); // 1
> >>>>>   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> >>>>> myConfig.getMaxCacheBytes()); // 524_288_000L
> >>>>>   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>>>   props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
> >>>>>
> >>>>> The stream LEFT JOINs 2 topics, one of them being a KTable, and
> outputs
> >>>> to
> >>>>> another topic.
> >>>>>
> >>>>> Thanks in advance for the help!
> >>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message