kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Re: Kafka Stream stops polling new messages
Date Tue, 09 May 2017 14:11:09 GMT
I'll leave it to your discretion, after realizing the problem it was an
easy work around, the bad experience was debugging and figuring out what
was going on.

Thanks for the help once again

JP
On Tue, May 9, 2017 at 4:36 AM Eno Thereska <eno.thereska@gmail.com> wrote:

> Yeah that's a good point, I'm not taking action then.
>
> Eno
>
> On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Hey,
> >
> > I am not against opening a JIRA, but I am wondering what we should
> > describe/report there. If I understand the scenario correctly, João uses
> > a custom RocksDB store and calls seek() in user code land. As it is a
> > bug in RocksDB that seek takes so long, I am not sure what we could
> > improve within Streams to prevent this?  The only thing I am seeing
> > right now is that we could reduce `max.poll.interval.ms` that we just
> > increased to guard against failure for long stat recreation phases.
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 5/3/17 8:48 AM, João Peixoto wrote:
> > > That'd be great as I'm not familiar with the protocol there
> > > On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> > >
> > >> Cool, thanks, shall we open a JIRA?
> > >>
> > >> Eno
> > >>> On 3 May 2017, at 16:16, João Peixoto <joao.hartimer@gmail.com>
> wrote:
> > >>>
> > >>> Actually I need to apologize, I pasted the wrong issue, I meant to
> > paste
> > >>> https://github.com/facebook/rocksdb/issues/261.
> > >>>
> > >>> RocksDB did not produce a crash report since it didn't actually
> crash.
> > I
> > >>> performed thread dumps on stale and not-stale instances which
> revealed
> > >> the
> > >>> common behavior and I collect and plot several Kafka metrics,
> including
> > >>> "punctuate" durations, therefore I know it took a long time and
> > >> eventually
> > >>> finished.
> > >>>
> > >>> Joao
> > >>>
> > >>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.thereska@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hi there,
> > >>>>
> > >>>> Thanks for double checking. Does RocksDB actually crash or produce
a
> > >> crash
> > >>>> dump? I’m curious how you know that the issue is
> > >>>> https://github.com/facebook/rocksdb/issues/1121 <
> > >>>> https://github.com/facebook/rocksdb/issues/1121>, so just double
> > >> checking
> > >>>> with you.
> > >>>>
> > >>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste
> of
> > >> the
> > >>>> below should suffice)? Alternatively let us know and we’ll open
it.
> > >> Sounds
> > >>>> like we should handle this better.
> > >>>>
> > >>>> Thanks,
> > >>>> Eno
> > >>>>
> > >>>>
> > >>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.hartimer@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>> I believe I found the root cause of my problem. I seem to have
hit
> > this
> > >>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121
> > >>>>>
> > >>>>> On my stream configuration I have a custom transformer used
for
> > >>>>> deduplicating records, highly inspired in the
> > >>>>> EventDeduplicationLambdaIntegrationTest
> > >>>>> <
> > >>>>
> > >> https://github.com/confluentinc/examples/blob/3.
> > 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > EventDeduplicationLambdaIntegrationTest.java#L161
> > >>>>>
> > >>>>> but
> > >>>>> adjusted to my use case, special emphasis on the "punctuate"
> method.
> > >>>>>
> > >>>>> All the stale instances had the main stream thread "RUNNING"
the
> > >>>>> "punctuate" method of this transformer, which in term was running
> > >> RocksDB
> > >>>>> "seekToFirst".
> > >>>>>
> > >>>>> Also during my debugging one such instance finished the "punctuate"
> > >>>> method,
> > >>>>> which took ~11h, exactly the time the instance was stuck for.
> > >>>>> Changing the backing state store from "persistent" to "inMemory"
> > solved
> > >>>> my
> > >>>>> issue, at least after several days running, no stuck instances.
> > >>>>>
> > >>>>> This leads me to ask, shouldn't Kafka detect such a situation
> fairly
> > >>>>> quickly? Instead of just stopping polling? My guess is that
the
> > >> heartbeat
> > >>>>> thread which now is separate continues working fine, since
by
> > >> definition
> > >>>>> the stream runs a message through the whole pipeline this step
> > probably
> > >>>>> just looked like it was VERY slow. Not sure what the best approach
> > here
> > >>>>> would be.
> > >>>>>
> > >>>>> PS The linked code clearly states "This code is for demonstration
> > >>>> purposes
> > >>>>> and was not tested for production usage" so that's on me
> > >>>>>
> > >>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <
> > matthias@confluent.io
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Did you check the logs? Maybe you need to increase log
level to
> > DEBUG
> > >> to
> > >>>>>> get some more information.
> > >>>>>>
> > >>>>>> Did you double check committed offsets via
> > >> bin/kafka-consumer-groups.sh?
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>> On 4/28/17 9:22 AM, João Peixoto wrote:
> > >>>>>>> My stream gets stale after a while and it simply does
not receive
> > any
> > >>>> new
> > >>>>>>> messages, aka does not poll.
> > >>>>>>>
> > >>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with
0.10.2.0) and
> > the
> > >>>>>>> brokers are running 0.10.1.1.
> > >>>>>>>
> > >>>>>>> The stream state is RUNNING and there are no exceptions
in the
> > logs.
> > >>>>>>>
> > >>>>>>> Looking at the JMX metrics, the threads are there and
running,
> just
> > >> not
> > >>>>>>> doing anything.
> > >>>>>>> The metric "consumer-coordinator-metrics >
> > >> heartbeat-response-time-max"
> > >>>>>>> (The max time taken to receive a response to a heartbeat
request)
> > >> reads
> > >>>>>>> 43,361 seconds (almost 12 hours) which is consistent
with the
> time
> > of
> > >>>> the
> > >>>>>>> hang. Shouldn't this trigger a failure somehow?
> > >>>>>>>
> > >>>>>>> The stream configuration looks something like this:
> > >>>>>>>
> > >>>>>>> Properties props = new Properties();
> > >>>>>>>   props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > >>>>>>>             CustomTimestampExtractor.class.getName());
> > >>>>>>>   props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> > >>>>>>>   props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
> > >>>>>>>   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > >>>>>>> myConfig.getBrokerList());
> > >>>>>>>   props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > >>>>>>> Serdes.String().getClass().getName());
> > >>>>>>>   props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>>>>>> Serdes.ByteArray().getClass().getName());
> > >>>>>>>   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >>>>>>> myConfig.getCommitIntervalMs()); // 5000
> > >>>>>>>   props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
> > "DEBUG");
> > >>>>>>>   props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> > >>>>>>> myConfig.getStreamThreadsCount()); // 1
> > >>>>>>>   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > >>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L
> > >>>>>>>   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
> > >>>>>>>   props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
50);
> > >>>>>>>
> > >>>>>>> The stream LEFT JOINs 2 topics, one of them being a
KTable, and
> > >> outputs
> > >>>>>> to
> > >>>>>>> another topic.
> > >>>>>>>
> > >>>>>>> Thanks in advance for the help!
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>
> > >>
> > >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message