kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Producer#commitTransaction() Not Being Called if New Records Aren't Processed by StreamTask
Date Tue, 15 May 2018 22:10:59 GMT
Thanks for reporting this @David!

@Guozhang: I actually think this is two different issues. This is also
exposed in a current PR:
https://github.com/apache/kafka/pull/4912/files#r188179256

I created https://issues.apache.org/jira/browse/KAFKA-6906 for this issue.


-Matthias



On 5/11/18 10:39 AM, Guozhang Wang wrote:
> Hello David,
> 
> Thanks for reporting your observations. I agree with you that we should
> improve on stream task committing mechanism. In fact, there is already a
> JIRA opened but for another motivation:
> https://issues.apache.org/jira/browse/KAFKA-5510
> 
> 
> Feel free to take on this JIRA and submit a PR.
> 
> Guozhang
> 
> 
> On Thu, May 10, 2018 at 1:33 PM, David Chu <david.chu@appdynamics.com>
> wrote:
> 
>> I have a simple Kafka Streams topology of the format “Source Topic ->
>> Processor -> Sink Topic” that is configured to use exactly-once processing
>> guarantees.  The Processor performs a reduce operation on the incoming
>> messages and stores the results to a key-value state store (with logging
>> enabled).  The contents of the key-value state store are forwarded to the
>> sink topic as part of a Punctuator implementation which is scheduled to run
>> every 10 seconds (using wall clock time).  It’s also important to note that
>> the commit interval for the stream application is set to 1 second.  The
>> problem I’m seeing is that if new records aren’t being consumed off the
>> source topic, then the Producer#commitTransaction() method is never invoked
>> inside the StreamTask#commitOffsets() method since it’s blocked by the "if
>> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always
>> appears to be false since no new records are being consumed.  This becomes
>> problematic in my case because the punctuator can be flushing items from
>> the state store to the sink topic even when no new records are being
>> consumed from the source topic.  However, because the transaction is not
>> being committed these messages can’t be consumed by downstream consumers
>> configured with an isolation level of read_committed.  Below is a more
>> detailed description of the steps which are taking place.
>>
>> 1. The source topic only has a few messages on it and no new messages are
>> arriving.
>> 2. All the messages are consumed by the Processor and the results are
>> placed into the state store before the first commit occurs at the 1 second
>> mark.
>> 3. The first commit occurs at the 1 second mark causing the
>> StreamTask#commitOffsets() method to get invoked.  Since messages were
>> processed as part of step #2 the StreamTask#commitOffsetNeeded field is set
>> to true which results in the Producer#commitTransaction() method being
>> called on line 358.  However, at this point in time, all the results are
>> still in the state store and have not yet been sent to the sink topic.
>> 4. The Punctuator is called at the 10 second mark and it forwards all the
>> entries in the state store to the sink topic.
>> 5. The next commit interval elapses, however, because no new incoming
>> events have been processed, the StreamTask#commitOffsetNeeded field is now
>> set to false so when the StreamTask#commitOffsets() method is invoked it
>> exits without doing any work.  As a result, the
>> Producer#commitTransaction() method is never called.
>> 6. Since the Producer#commitTransaction() method is never called, the
>> messages which were placed onto the sink topic by the Punctuator in step #4
>> can never be seen by consumers configured with an isolation level of
>> read_committed.
>> 7. Furthermore, calling the ProcessorContext#commit() method within the
>> Punctuator#punctuate() method does not seem to help the situation since the
>> StreamTask#commitOffsets() method does not take into consideration the
>> value of the StreamTask#commitRequested field.
>>
>> So my question is, would it be beneficial to update the logic in the
>> StreamTask#commitOffsets() method so that if ProcessorContext#commit() has
>> been called and exactly-once processing has been enabled that the
>> Producer#commitTransaction() method is still called even if no records were
>> consumed off the topic?  This would help to handle the case where the
>> punctuate call itself is producing messages to a downstream topic.
>>
>> Thanks,
>> David
> 
> 
> 
> 


Mime
View raw message