samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Fang <yanfang...@gmail.com>
Subject Re: consistency between input, output and changelog streams
Date Tue, 07 Apr 2015 21:12:25 GMT
Hi Bart,

In terms of your assumption,

* Ts <= To , this is correction. The code backups this assumption is here:
in RunLoop
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala>
,
the commit is called after each process and window methods. E.g. process1
-> commit T1 -> process2 -> fail -> commit T2 will not happen. When
restarting the job, it consumes from T1. The messages between T1 and T2
will be reprocessed.

* Ti <= Ts is not always true. There are two scenarios ( we call db.put()
in process/window ).

    1) process1 -> db.put1 success -> commit T1 -> process2 -> db.put2
success -> following process fails -> commit T2 will not happen. In this
scenario, Ti <= Ts because the latest changelog happens later than the
checkpoint. In this scenario, when we reprocess the stream from T1, same
operation will happen twice because db.put2 already succeeds.* It is
usually fine that putting the same data twice/deleting the same data twice.
It may have some issues if the db.put2 is accumulating based on its
previous value. -- @Chris, is this true ?*

    2) process1 -> db.put1 -> commit T1 -> process2 -> fail . This is Ti >=
Ts because the latest checkpoint happens after changelog.

    The changelog code is in the LoggedStore
<https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala>
,
you can tell that, the changelog is written after each db operation
(put/delete,etc). All the db operations are called in process or window
method.

Hope this can help you.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Tue, Apr 7, 2015 at 7:27 AM, Bart Wyatt <bart.wyatt@dsvolition.com>
wrote:

> We are trying to make sure that we are handling the proper edge cases in
> our stateful tasks so that our processes can survive failure well.
>
> Given the changelog will recreate the KV store (state) up to the point of
> time of the last durable changelog write(Ts), the checkpoint will start
> input from the point of time represented in the last durable checkpoint
> write(Ti) and the output will have messages from it at the 3rd point in
> time of the last durable output write(To), our current assumption is that
> in all recovery cases:
>
> Ti <= Ts <= To
>
> This means that some input may be "replayed" from the point of view of the
> KV store which is handled by normal at-least-once-delivery semantics
> processing and that we may duplicate output messages that would have been
> produced between Ts and To which is also consistent with
> at-least-once-delivery.
>
> However, I cannot find code that backs this assumptions and I'm hoping
> I've just missed it, because:
>
> If To < Ts, then we may drop output because the state assumed it was
> already written and due to timing of actual writes to kafka or durability
> concerns the output is not there.  This is important for a job, for
> example, that emits "session started @ X" messages on the first message for
> any given session key.  The state will see a repeated message as a
> duplicate and not emit the output.  I think this is solvable in the job as
> long as To >= Ti, but I am not certain the solution is generally applicable
> to tasks where side-effects of previous input exist in the state and have
> an effect on future output.
>
> If Ts < Ti, then our stateful task will effectively drop input, even
> though it may have produced some or all of the output for those messages in
> its previous incarnation, as the state used for all future processes will
> not have the side effects of processing the messages between Ts and Ti. We
> see no solution for this at the task level as it would require collusion
> between two backing system (checkpoints and changelogs) to correct,
> presumably by rewinding Ti to Ts.
>
> Perhaps my code search failed because I was expecting some colluding
> system that would wait for output to write out before writing changelog
> entries and then again before checkpoints and that was to presumptive.  Is
> there something about the code, the assumption or my edge analysis that
> I've missed to address this?
>
> -Bart
>
>
> ________________________________
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or
> PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient
> and, therefore, may not be retransmitted to any party outside of the
> recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original message
> without making a copy. Deep Silver, Inc. accepts no liability for any
> losses or damages resulting from infected e-mail transmissions and viruses
> in e-mail attachments.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message