kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Chu <david....@appdynamics.com>
Subject Producer#commitTransaction() Not Being Called if New Records Aren't Processed by StreamTask
Date Thu, 10 May 2018 20:33:06 GMT
I have a simple Kafka Streams topology of the format “Source Topic -> Processor ->
Sink Topic” that is configured to use exactly-once processing guarantees.  The Processor
performs a reduce operation on the incoming messages and stores the results to a key-value
state store (with logging enabled).  The contents of the key-value state store are forwarded
to the sink topic as part of a Punctuator implementation which is scheduled to run every 10
seconds (using wall clock time).  It’s also important to note that the commit interval for
the stream application is set to 1 second.  The problem I’m seeing is that if new records
aren’t being consumed off the source topic, then the Producer#commitTransaction() method
is never invoked inside the StreamTask#commitOffsets() method since it’s blocked by the
"if (commitOffsetNeeded)” condition - the commitOffsetNeeded field always appears to be
false since no new records are being consumed.  This becomes problematic in my case because
the punctuator can be flushing items from the state store to the sink topic even when no new
records are being consumed from the source topic.  However, because the transaction is not
being committed these messages can’t be consumed by downstream consumers configured with
an isolation level of read_committed.  Below is a more detailed description of the steps which
are taking place.

1. The source topic only has a few messages on it and no new messages are arriving.   
2. All the messages are consumed by the Processor and the results are placed into the state
store before the first commit occurs at the 1 second mark.  
3. The first commit occurs at the 1 second mark causing the StreamTask#commitOffsets() method
to get invoked.  Since messages were processed as part of step #2 the StreamTask#commitOffsetNeeded
field is set to true which results in the Producer#commitTransaction() method being called
on line 358.  However, at this point in time, all the results are still in the state store
and have not yet been sent to the sink topic.  
4. The Punctuator is called at the 10 second mark and it forwards all the entries in the state
store to the sink topic.
5. The next commit interval elapses, however, because no new incoming events have been processed,
the StreamTask#commitOffsetNeeded field is now set to false so when the StreamTask#commitOffsets()
method is invoked it exits without doing any work.  As a result, the Producer#commitTransaction()
method is never called.  
6. Since the Producer#commitTransaction() method is never called, the messages which were
placed onto the sink topic by the Punctuator in step #4 can never be seen by consumers configured
with an isolation level of read_committed.
7. Furthermore, calling the ProcessorContext#commit() method within the Punctuator#punctuate()
method does not seem to help the situation since the StreamTask#commitOffsets() method does
not take into consideration the value of the StreamTask#commitRequested field.

So my question is, would it be beneficial to update the logic in the StreamTask#commitOffsets()
method so that if ProcessorContext#commit() has been called and exactly-once processing has
been enabled that the Producer#commitTransaction() method is still called even if no records
were consumed off the topic?  This would help to handle the case where the punctuate call
itself is producing messages to a downstream topic. 

View raw message