samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bart Wyatt <bart.wy...@dsvolition.com>
Subject RE: consistency between input, output and changelog streams
Date Wed, 08 Apr 2015 14:19:33 GMT
This sounds inverted to me.

If we write to a LoggedStore during process and window and we write checkpoints in commit,
then this should ensure the Ti <= Ts assertion.  Ts will move forward in time during process/window,
and Ti will catch up on commit.  If the process dies between then at worst Ts is ahead of
Ti.  That is good, because on recovery replayed messages between Ti and Ts.  So, I think this
covers any problems between checkpoints and KV consistency.

I still think there is an issue with KV and output consistency.  If KV values are immediately
committed to the changelog when you do a db.put are output messages immediately committed
to the output stream when you call Collector.send()?  Looking through it, that may be the
case.  In which case, the job can ensure that  output is send before state is updated to guarantee
that on failure/recovery Ts is before To.

This code, however, has me worried:
https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171

this makes it look like Ts moves forward in time first (#160) then output (#166) then the
input checkpoint (#170).  If we fail between storageManager.flush() and collector.flush()
won't we have a recovery scenario where our storage contains side-effects of input messages
that are not represented in the output stream or the input checkpoint.  That seems to create
a situation where a naive process that produces output based on a combination of input and
state may not produce the expected output.  Again, this may be something that can be addressed
in the process method itself in some (maybe all) cases, but it is a caveat that I hadn't seen
mentioned elsewhere.

The example is a naïve implementation of a task that outputs the first of a set of messages
defined by a grouping field.  In the state, you would store whether you had seen a message
from given group.  If the state is empty for a group then you emit that message and update
the state to reflect that you have seen a message.  This is safe for at-least-once delivery
semantics because you repeated messages after the first can also be dropped.  However, If
we fail in the way described above, when we recover we will replay the first message, but
our state will indicate that we have already seen a message for that group and we would not
produce output.   This violates the implied "at least once" output semantics but not in a
way that our process can be aware of easily.  Instead, this particular case could be solved
by storing the offset of the first message in the state (not just the fact that we saw it),
so that on replay of the first message you can determine that, while you have seen an input
from this group, this is the replay of that first input and it should be re-emitted to the
output stream.

-Bart




-----Original Message-----
From: Yan Fang [mailto:yanfang724@gmail.com]
Sent: Tuesday, April 7, 2015 4:12 PM
To: dev@samza.apache.org
Subject: Re: consistency between input, output and changelog streams

Hi Bart,

In terms of your assumption,

* Ts <= To , this is correction. The code backups this assumption is here:
in RunLoop
<https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala>
,
the commit is called after each process and window methods. E.g. process1
-> commit T1 -> process2 -> fail -> commit T2 will not happen. When
restarting the job, it consumes from T1. The messages between T1 and T2 will be reprocessed.

* Ti <= Ts is not always true. There are two scenarios ( we call db.put() in process/window
).

    1) process1 -> db.put1 success -> commit T1 -> process2 -> db.put2 success
-> following process fails -> commit T2 will not happen. In this scenario, Ti <=
Ts because the latest changelog happens later than the checkpoint. In this scenario, when
we reprocess the stream from T1, same operation will happen twice because db.put2 already
succeeds.* It is usually fine that putting the same data twice/deleting the same data twice.
It may have some issues if the db.put2 is accumulating based on its previous value. -- @Chris,
is this true ?*

    2) process1 -> db.put1 -> commit T1 -> process2 -> fail . This is Ti >=
Ts because the latest checkpoint happens after changelog.

    The changelog code is in the LoggedStore <https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala>
,
you can tell that, the changelog is written after each db operation (put/delete,etc). All
the db operations are called in process or window method.

Hope this can help you.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Tue, Apr 7, 2015 at 7:27 AM, Bart Wyatt <bart.wyatt@dsvolition.com>
wrote:

> We are trying to make sure that we are handling the proper edge cases
> in our stateful tasks so that our processes can survive failure well.
>
> Given the changelog will recreate the KV store (state) up to the point
> of time of the last durable changelog write(Ts), the checkpoint will
> start input from the point of time represented in the last durable
> checkpoint
> write(Ti) and the output will have messages from it at the 3rd point
> in time of the last durable output write(To), our current assumption
> is that in all recovery cases:
>
> Ti <= Ts <= To
>
> This means that some input may be "replayed" from the point of view of
> the KV store which is handled by normal at-least-once-delivery
> semantics processing and that we may duplicate output messages that
> would have been produced between Ts and To which is also consistent
> with at-least-once-delivery.
>
> However, I cannot find code that backs this assumptions and I'm hoping
> I've just missed it, because:
>
> If To < Ts, then we may drop output because the state assumed it was
> already written and due to timing of actual writes to kafka or
> durability concerns the output is not there.  This is important for a
> job, for example, that emits "session started @ X" messages on the
> first message for any given session key.  The state will see a
> repeated message as a duplicate and not emit the output.  I think this
> is solvable in the job as long as To >= Ti, but I am not certain the
> solution is generally applicable to tasks where side-effects of
> previous input exist in the state and have an effect on future output.
>
> If Ts < Ti, then our stateful task will effectively drop input, even
> though it may have produced some or all of the output for those
> messages in its previous incarnation, as the state used for all future
> processes will not have the side effects of processing the messages
> between Ts and Ti. We see no solution for this at the task level as it
> would require collusion between two backing system (checkpoints and
> changelogs) to correct, presumably by rewinding Ti to Ts.
>
> Perhaps my code search failed because I was expecting some colluding
> system that would wait for output to write out before writing
> changelog entries and then again before checkpoints and that was to
> presumptive.  Is there something about the code, the assumption or my
> edge analysis that I've missed to address this?
>
> -Bart
>
>
> ________________________________
> This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION
> and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for
> the recipient and, therefore, may not be retransmitted to any party
> outside of the recipient's organization without the prior written consent of the sender.
> If you have received this e-mail in error please notify the sender
> immediately by telephone or reply e-mail and destroy the original
> message without making a copy. Deep Silver, Inc. accepts no liability
> for any losses or damages resulting from infected e-mail transmissions
> and viruses in e-mail attachments.
>


________________________________
This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL
COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to
any party outside of the recipient's organization without the prior written consent of the
sender. If you have received this e-mail in error please notify the sender immediately by
telephone or reply e-mail and destroy the original message without making a copy. Deep Silver,
Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions
and viruses in e-mail attachments.
Mime
View raw message