samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roger Hoover <>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Wed, 03 Sep 2014 16:43:32 GMT

Thank you for the feedback.  Responses inline.

On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman <>

> That's interesting !
> 1) Deterministic reading from a bootstrap stream:
> We could define a changelog for the local state (which in turn is
> populated using a bootstrap stream). If the job fails at this point,
> ideally, it should be restored using a changelog stream (instead of
> bootstrapping again) in order for the job to be deterministic (as you
> suggest). Thus there could be a check which either selects the bootstrap
> mode or the changelog restore mode (depending on whether a changelog
> exists).  I'm not sure if this check exists today (I would guess no).

Yes, I was wondering about this for event-table join.   If you're only
storing the join table in the local store then the change log stream and
the bootstrap stream are duplicates of each other.  One of them is
unnecessary unless you add additional state to the local store.  In any
case, I'm wondering if bootstrapping is ever necessary on recovery or
rewind.  Seems like it's only needed for cold start.

> 2) Deterministic changelog:
> You're right - there could be a (smallish) window where we re-process some
> of the input records on a container restart. This can happen since the
> changelog can be (a little ahead) of the last checkpoint for a given input
> stream. However, I would argue the changelog is still deterministic in this
> case. Since currently Samza only guarantees at-least-once semantics, this
> seems to be OK

Hmmm...seems like this will violate the correctness definition that Chris
outlined where there may be two or more "correct" choices but the system
would guarantee that only one will be produced.  But now that you mention
it, I don't know if that can ever been guaranteed with an at-least-once
system.  If a task can always see duplicates, it may process the first with
it's local state in state S1 then modify it's state to S2 and then process
the duplicate.

> 3) Deterministic MessageChooser:
> The in-order problem could be avoided, if we restore the state from its
> changelog - which was originally populated by a 'bootstrap stream'. The
> task can then just pick up from where it left off (making the system
> deterministic). Having said that, there might be value in writing an
> 'EarliestFirstChooser'.


> Again, this is just my perception (which could be wrong - I'm still
> learning).
> C
> ________________________________________
> From: Roger Hoover []
> Sent: Tuesday, September 02, 2014 8:52 AM
> To:
> Subject: Trying to achieve deterministic behavior on recovery/rewind
> Hi Samza devs,
> I think this project has the best documentation I've even seen!  Amazing
> job.  It's extremely well written and Hello Samza is a really great example
> that I was able to run + modify without issue.  It was a joy reading the
> docs and playing around with example.  Kudos!
> After thoroughly reading all the docs, I still have a few questions and
> would appreciate any feedback.
> I was thinking about how to support deterministic behavior on recovery or
> rewind.  Maybe it can't always be 100% deterministic but I think we can get
> close.  Have other people thought about this?  Is it desirable?
> For example, let's say we're joining two streams: orders and user_info.  As
> orders come in, we use the user_id field of the order to lookup additional
> information about the user and enrich the stream.  Say we're keeping all
> the user_info state in the local KV store.
> t1: User updates her email to ""
> t2: User buys a pair of jeans (product_id == 99)
> t3: User updates her email to ""
> In the case of normal operation (no delays in the user_info stream), the
> enriched record will be:
> {product_id: 99, email: "", ...}
> But say that our job fails before it can checkpoint and is configured to
> bootstrap from user_info.  When it gets restarted and bootstraps from the
> user_info stream, it will end up with the email set to "" in
> the local KV store.  Then it will reprocess the order event and produce the
> "wrong" output:
> {product_id: 99, email: "", ...}
> I haven't verified that but the documentation says "a bootstrap stream
> waits for the consumer to explicitly confirm that the stream has been fully
> consumed."  Shouldn't it wait until it's consumed up the the checkpoint
> offset for the bootsrap stream instead (when there is saved checkpoint
> offset)?
> Likewise, for local state replicated in the change log.  During the
> checkpoint process, Samza could include it's producer offset in the
> checkpoint data so that during recovery, the local state will be restored
> to a state that corresponds with it's offsets for the input streams.
>  Everything would be coherent rather than having the input streams restored
> to checkpoint and local state restored to most recent value.  I'm assuming
> that change log commits for local state and checkpoint are done together in
> an atomic transaction so that they may not always match.
> The other missing piece is a nearly deterministic MessageChooser.  During
> recovery + rewind, all the messages in both streams are already present in
> Kafka and we want a way to replay them in the same order as if they were
> played in real-time.  The only way to approximate this behavior that I can
> see is to use Kafka broker timestamps for each message.  Is it possible to
> write an "EarliestFirstChooser" that always chooses the oldest message
> available according to the timestamp it was received by the Kafka broker?
> I don't know if Kafka stores a timestamp with each message but I'm assuming
> it does because it supports an API on the simple consumer called
> getOffsetsBefore() that would seem to map from timestamps to offsets.
> Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data has the
> version as {"samza-version": "0.0.1"}.  Is this intentional?
> If it makes sense, I can put in some JIRA tickets for this stuff...
> Cheers,
> Roger

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message