kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Producer#commitTransaction() Not Being Called if New Records Aren't Processed by StreamTask
Date Fri, 11 May 2018 17:39:53 GMT
Hello David,

Thanks for reporting your observations. I agree with you that we should
improve on stream task committing mechanism. In fact, there is already a
JIRA opened but for another motivation:

Feel free to take on this JIRA and submit a PR.


On Thu, May 10, 2018 at 1:33 PM, David Chu <david.chu@appdynamics.com>

> I have a simple Kafka Streams topology of the format “Source Topic ->
> Processor -> Sink Topic” that is configured to use exactly-once processing
> guarantees.  The Processor performs a reduce operation on the incoming
> messages and stores the results to a key-value state store (with logging
> enabled).  The contents of the key-value state store are forwarded to the
> sink topic as part of a Punctuator implementation which is scheduled to run
> every 10 seconds (using wall clock time).  It’s also important to note that
> the commit interval for the stream application is set to 1 second.  The
> problem I’m seeing is that if new records aren’t being consumed off the
> source topic, then the Producer#commitTransaction() method is never invoked
> inside the StreamTask#commitOffsets() method since it’s blocked by the "if
> (commitOffsetNeeded)” condition - the commitOffsetNeeded field always
> appears to be false since no new records are being consumed.  This becomes
> problematic in my case because the punctuator can be flushing items from
> the state store to the sink topic even when no new records are being
> consumed from the source topic.  However, because the transaction is not
> being committed these messages can’t be consumed by downstream consumers
> configured with an isolation level of read_committed.  Below is a more
> detailed description of the steps which are taking place.
> 1. The source topic only has a few messages on it and no new messages are
> arriving.
> 2. All the messages are consumed by the Processor and the results are
> placed into the state store before the first commit occurs at the 1 second
> mark.
> 3. The first commit occurs at the 1 second mark causing the
> StreamTask#commitOffsets() method to get invoked.  Since messages were
> processed as part of step #2 the StreamTask#commitOffsetNeeded field is set
> to true which results in the Producer#commitTransaction() method being
> called on line 358.  However, at this point in time, all the results are
> still in the state store and have not yet been sent to the sink topic.
> 4. The Punctuator is called at the 10 second mark and it forwards all the
> entries in the state store to the sink topic.
> 5. The next commit interval elapses, however, because no new incoming
> events have been processed, the StreamTask#commitOffsetNeeded field is now
> set to false so when the StreamTask#commitOffsets() method is invoked it
> exits without doing any work.  As a result, the
> Producer#commitTransaction() method is never called.
> 6. Since the Producer#commitTransaction() method is never called, the
> messages which were placed onto the sink topic by the Punctuator in step #4
> can never be seen by consumers configured with an isolation level of
> read_committed.
> 7. Furthermore, calling the ProcessorContext#commit() method within the
> Punctuator#punctuate() method does not seem to help the situation since the
> StreamTask#commitOffsets() method does not take into consideration the
> value of the StreamTask#commitRequested field.
> So my question is, would it be beneficial to update the logic in the
> StreamTask#commitOffsets() method so that if ProcessorContext#commit() has
> been called and exactly-once processing has been enabled that the
> Producer#commitTransaction() method is still called even if no records were
> consumed off the topic?  This would help to handle the case where the
> punctuate call itself is producing messages to a downstream topic.
> Thanks,
> David

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message