flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Kafka producer sink message loss?
Date Tue, 07 Jun 2016 11:52:22 GMT
Hi Elias!

The concern you raised about the sink being synchronous is exactly what my
last suggestion should address:

The internal state backends can return a handle that can do the sync in a
background thread. The sink would continue processing messages, and the
checkpoint would only be acknowledged after the background sync did
complete.
We should allow user code to return such a handle as well.

We have to think about implications concerning message order, though...

Greetings,
Stephan


On Mon, Jun 6, 2016 at 11:58 PM, Elias Levy <fearsome.lucidity@gmail.com>
wrote:

> On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> You raised a good point. Fortunately, there should be a simply way to fix
>> this.
>>
>> The Kafka Sunk Function should implement the "Checkpointed" interface. It
>> will get a call to the "snapshotState()" method whenever a checkpoint
>> happens. Inside that call, it should then sync on the callbacks, and only
>> return once all have completed. It may return null (no need to store
>> anything in the checkpoint).
>>
>> While the "Checkpointed" method has not returned, the checkpoint will not
>> complete. That way, there will be a "synchronization" point per checkpoint.
>>
>> We can even improve this further in the future: The checkpoint method can
>> return an async state handle. While the async state handle completes its
>> "wait for callbacks" in the background (and only acks the checkpoint after
>> that has complete), the sink function can continue processing.
>>
>> What do you think?
>>
>
> I opened FLINK-4027 <https://issues.apache.org/jira/browse/FLINK-4027> to
> track the issue.
>
> That seems like an acceptable solution.  Presumably an exception can be
> raised in snapshotState() if there is a Kafka publishing error when calling
> flush() on the Kafka producer, which will cause the checkpoint to fail.
>
> I do wonder what sort of performance penalty using flush() will incur, as
> it is a synchronous call.  I assume no other messages can be processed by
> the sink while inside snapshotState().  In theory a sink could continue
> processing messages, so long as it kept track of pending messages that
> occurred before the barrier and responded to the snapshotState() call when
> there no longer were any pending messages from before the barrier.
>
>

Mime
View raw message