kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Stream stops polling new messages
Date Tue, 09 May 2017 16:27:23 GMT
I would be great if we could help to simplify debugging. Any ideas?


-Matthias


On 5/9/17 7:11 AM, João Peixoto wrote:
> I'll leave it to your discretion, after realizing the problem it was an
> easy work around, the bad experience was debugging and figuring out what
> was going on.
> 
> Thanks for the help once again
> 
> JP
> On Tue, May 9, 2017 at 4:36 AM Eno Thereska <eno.thereska@gmail.com> wrote:
> 
>> Yeah that's a good point, I'm not taking action then.
>>
>> Eno
>>
>> On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>
>>> Hey,
>>>
>>> I am not against opening a JIRA, but I am wondering what we should
>>> describe/report there. If I understand the scenario correctly, João uses
>>> a custom RocksDB store and calls seek() in user code land. As it is a
>>> bug in RocksDB that seek takes so long, I am not sure what we could
>>> improve within Streams to prevent this?  The only thing I am seeing
>>> right now is that we could reduce `max.poll.interval.ms` that we just
>>> increased to guard against failure for long stat recreation phases.
>>>
>>> Any thoughts?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/3/17 8:48 AM, João Peixoto wrote:
>>>> That'd be great as I'm not familiar with the protocol there
>>>> On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.thereska@gmail.com>
>>> wrote:
>>>>
>>>>> Cool, thanks, shall we open a JIRA?
>>>>>
>>>>> Eno
>>>>>> On 3 May 2017, at 16:16, João Peixoto <joao.hartimer@gmail.com>
>> wrote:
>>>>>>
>>>>>> Actually I need to apologize, I pasted the wrong issue, I meant to
>>> paste
>>>>>> https://github.com/facebook/rocksdb/issues/261.
>>>>>>
>>>>>> RocksDB did not produce a crash report since it didn't actually
>> crash.
>>> I
>>>>>> performed thread dumps on stale and not-stale instances which
>> revealed
>>>>> the
>>>>>> common behavior and I collect and plot several Kafka metrics,
>> including
>>>>>> "punctuate" durations, therefore I know it took a long time and
>>>>> eventually
>>>>>> finished.
>>>>>>
>>>>>> Joao
>>>>>>
>>>>>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.thereska@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> Thanks for double checking. Does RocksDB actually crash or produce
a
>>>>> crash
>>>>>>> dump? I’m curious how you know that the issue is
>>>>>>> https://github.com/facebook/rocksdb/issues/1121 <
>>>>>>> https://github.com/facebook/rocksdb/issues/1121>, so just
double
>>>>> checking
>>>>>>> with you.
>>>>>>>
>>>>>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste
>> of
>>>>> the
>>>>>>> below should suffice)? Alternatively let us know and we’ll
open it.
>>>>> Sounds
>>>>>>> like we should handle this better.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eno
>>>>>>>
>>>>>>>
>>>>>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.hartimer@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I believe I found the root cause of my problem. I seem to
have hit
>>> this
>>>>>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121
>>>>>>>>
>>>>>>>> On my stream configuration I have a custom transformer used
for
>>>>>>>> deduplicating records, highly inspired in the
>>>>>>>> EventDeduplicationLambdaIntegrationTest
>>>>>>>> <
>>>>>>>
>>>>> https://github.com/confluentinc/examples/blob/3.
>>> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/
>>> EventDeduplicationLambdaIntegrationTest.java#L161
>>>>>>>>
>>>>>>>> but
>>>>>>>> adjusted to my use case, special emphasis on the "punctuate"
>> method.
>>>>>>>>
>>>>>>>> All the stale instances had the main stream thread "RUNNING"
the
>>>>>>>> "punctuate" method of this transformer, which in term was
running
>>>>> RocksDB
>>>>>>>> "seekToFirst".
>>>>>>>>
>>>>>>>> Also during my debugging one such instance finished the "punctuate"
>>>>>>> method,
>>>>>>>> which took ~11h, exactly the time the instance was stuck
for.
>>>>>>>> Changing the backing state store from "persistent" to "inMemory"
>>> solved
>>>>>>> my
>>>>>>>> issue, at least after several days running, no stuck instances.
>>>>>>>>
>>>>>>>> This leads me to ask, shouldn't Kafka detect such a situation
>> fairly
>>>>>>>> quickly? Instead of just stopping polling? My guess is that
the
>>>>> heartbeat
>>>>>>>> thread which now is separate continues working fine, since
by
>>>>> definition
>>>>>>>> the stream runs a message through the whole pipeline this
step
>>> probably
>>>>>>>> just looked like it was VERY slow. Not sure what the best
approach
>>> here
>>>>>>>> would be.
>>>>>>>>
>>>>>>>> PS The linked code clearly states "This code is for demonstration
>>>>>>> purposes
>>>>>>>> and was not tested for production usage" so that's on me
>>>>>>>>
>>>>>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <
>>> matthias@confluent.io
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Did you check the logs? Maybe you need to increase log
level to
>>> DEBUG
>>>>> to
>>>>>>>>> get some more information.
>>>>>>>>>
>>>>>>>>> Did you double check committed offsets via
>>>>> bin/kafka-consumer-groups.sh?
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 4/28/17 9:22 AM, João Peixoto wrote:
>>>>>>>>>> My stream gets stale after a while and it simply
does not receive
>>> any
>>>>>>> new
>>>>>>>>>> messages, aka does not poll.
>>>>>>>>>>
>>>>>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with
0.10.2.0) and
>>> the
>>>>>>>>>> brokers are running 0.10.1.1.
>>>>>>>>>>
>>>>>>>>>> The stream state is RUNNING and there are no exceptions
in the
>>> logs.
>>>>>>>>>>
>>>>>>>>>> Looking at the JMX metrics, the threads are there
and running,
>> just
>>>>> not
>>>>>>>>>> doing anything.
>>>>>>>>>> The metric "consumer-coordinator-metrics >
>>>>> heartbeat-response-time-max"
>>>>>>>>>> (The max time taken to receive a response to a heartbeat
request)
>>>>> reads
>>>>>>>>>> 43,361 seconds (almost 12 hours) which is consistent
with the
>> time
>>> of
>>>>>>> the
>>>>>>>>>> hang. Shouldn't this trigger a failure somehow?
>>>>>>>>>>
>>>>>>>>>> The stream configuration looks something like this:
>>>>>>>>>>
>>>>>>>>>> Properties props = new Properties();
>>>>>>>>>>   props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>>>>>>>>>             CustomTimestampExtractor.class.getName());
>>>>>>>>>>   props.put(StreamsConfig.APPLICATION_ID_CONFIG,
streamName);
>>>>>>>>>>   props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
>>>>>>>>>>   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>>> myConfig.getBrokerList());
>>>>>>>>>>   props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>>>>> Serdes.String().getClass().getName());
>>>>>>>>>>   props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>>>>> Serdes.ByteArray().getClass().getName());
>>>>>>>>>>   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>>>>>>>>> myConfig.getCommitIntervalMs()); // 5000
>>>>>>>>>>   props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
>>> "DEBUG");
>>>>>>>>>>   props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
>>>>>>>>>> myConfig.getStreamThreadsCount()); // 1
>>>>>>>>>>   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>>>>>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L
>>>>>>>>>>   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
>>>>>>>>>>   props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
50);
>>>>>>>>>>
>>>>>>>>>> The stream LEFT JOINs 2 topics, one of them being
a KTable, and
>>>>> outputs
>>>>>>>>> to
>>>>>>>>>> another topic.
>>>>>>>>>>
>>>>>>>>>> Thanks in advance for the help!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 


Mime
View raw message