kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Stream stops polling new messages
Date Mon, 08 May 2017 21:38:13 GMT
Hey,

I am not against opening a JIRA, but I am wondering what we should
describe/report there. If I understand the scenario correctly, João uses
a custom RocksDB store and calls seek() in user code land. As it is a
bug in RocksDB that seek takes so long, I am not sure what we could
improve within Streams to prevent this?  The only thing I am seeing
right now is that we could reduce `max.poll.interval.ms` that we just
increased to guard against failure for long stat recreation phases.

Any thoughts?


-Matthias


On 5/3/17 8:48 AM, João Peixoto wrote:
> That'd be great as I'm not familiar with the protocol there
> On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.thereska@gmail.com> wrote:
> 
>> Cool, thanks, shall we open a JIRA?
>>
>> Eno
>>> On 3 May 2017, at 16:16, João Peixoto <joao.hartimer@gmail.com> wrote:
>>>
>>> Actually I need to apologize, I pasted the wrong issue, I meant to paste
>>> https://github.com/facebook/rocksdb/issues/261.
>>>
>>> RocksDB did not produce a crash report since it didn't actually crash. I
>>> performed thread dumps on stale and not-stale instances which revealed
>> the
>>> common behavior and I collect and plot several Kafka metrics, including
>>> "punctuate" durations, therefore I know it took a long time and
>> eventually
>>> finished.
>>>
>>> Joao
>>>
>>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.thereska@gmail.com>
>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> Thanks for double checking. Does RocksDB actually crash or produce a
>> crash
>>>> dump? I’m curious how you know that the issue is
>>>> https://github.com/facebook/rocksdb/issues/1121 <
>>>> https://github.com/facebook/rocksdb/issues/1121>, so just double
>> checking
>>>> with you.
>>>>
>>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste of
>> the
>>>> below should suffice)? Alternatively let us know and we’ll open it.
>> Sounds
>>>> like we should handle this better.
>>>>
>>>> Thanks,
>>>> Eno
>>>>
>>>>
>>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.hartimer@gmail.com>
>>>> wrote:
>>>>>
>>>>> I believe I found the root cause of my problem. I seem to have hit this
>>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121
>>>>>
>>>>> On my stream configuration I have a custom transformer used for
>>>>> deduplicating records, highly inspired in the
>>>>> EventDeduplicationLambdaIntegrationTest
>>>>> <
>>>>
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java#L161
>>>>>
>>>>> but
>>>>> adjusted to my use case, special emphasis on the "punctuate" method.
>>>>>
>>>>> All the stale instances had the main stream thread "RUNNING" the
>>>>> "punctuate" method of this transformer, which in term was running
>> RocksDB
>>>>> "seekToFirst".
>>>>>
>>>>> Also during my debugging one such instance finished the "punctuate"
>>>> method,
>>>>> which took ~11h, exactly the time the instance was stuck for.
>>>>> Changing the backing state store from "persistent" to "inMemory" solved
>>>> my
>>>>> issue, at least after several days running, no stuck instances.
>>>>>
>>>>> This leads me to ask, shouldn't Kafka detect such a situation fairly
>>>>> quickly? Instead of just stopping polling? My guess is that the
>> heartbeat
>>>>> thread which now is separate continues working fine, since by
>> definition
>>>>> the stream runs a message through the whole pipeline this step probably
>>>>> just looked like it was VERY slow. Not sure what the best approach here
>>>>> would be.
>>>>>
>>>>> PS The linked code clearly states "This code is for demonstration
>>>> purposes
>>>>> and was not tested for production usage" so that's on me
>>>>>
>>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <matthias@confluent.io
>>>
>>>>> wrote:
>>>>>
>>>>>> Did you check the logs? Maybe you need to increase log level to DEBUG
>> to
>>>>>> get some more information.
>>>>>>
>>>>>> Did you double check committed offsets via
>> bin/kafka-consumer-groups.sh?
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 4/28/17 9:22 AM, João Peixoto wrote:
>>>>>>> My stream gets stale after a while and it simply does not receive
any
>>>> new
>>>>>>> messages, aka does not poll.
>>>>>>>
>>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0)
and the
>>>>>>> brokers are running 0.10.1.1.
>>>>>>>
>>>>>>> The stream state is RUNNING and there are no exceptions in the
logs.
>>>>>>>
>>>>>>> Looking at the JMX metrics, the threads are there and running,
just
>> not
>>>>>>> doing anything.
>>>>>>> The metric "consumer-coordinator-metrics >
>> heartbeat-response-time-max"
>>>>>>> (The max time taken to receive a response to a heartbeat request)
>> reads
>>>>>>> 43,361 seconds (almost 12 hours) which is consistent with the
time of
>>>> the
>>>>>>> hang. Shouldn't this trigger a failure somehow?
>>>>>>>
>>>>>>> The stream configuration looks something like this:
>>>>>>>
>>>>>>> Properties props = new Properties();
>>>>>>>   props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>>>>>>             CustomTimestampExtractor.class.getName());
>>>>>>>   props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
>>>>>>>   props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
>>>>>>>   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>> myConfig.getBrokerList());
>>>>>>>   props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>>>> Serdes.String().getClass().getName());
>>>>>>>   props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>>>> Serdes.ByteArray().getClass().getName());
>>>>>>>   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>>>>>> myConfig.getCommitIntervalMs()); // 5000
>>>>>>>   props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
>>>>>>>   props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
>>>>>>> myConfig.getStreamThreadsCount()); // 1
>>>>>>>   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L
>>>>>>>   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>>>>   props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
>>>>>>>
>>>>>>> The stream LEFT JOINs 2 topics, one of them being a KTable, and
>> outputs
>>>>>> to
>>>>>>> another topic.
>>>>>>>
>>>>>>> Thanks in advance for the help!
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>
>>
> 


Mime
View raw message