samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chinmay Soman <cso...@linkedin.com.INVALID>
Subject RE: Trying to achieve deterministic behavior on recovery/rewind
Date Tue, 02 Sep 2014 21:09:41 GMT
That's interesting !

1) Deterministic reading from a bootstrap stream:
We could define a changelog for the local state (which in turn is populated using a bootstrap
stream). If the job fails at this point, ideally, it should be restored using a changelog
stream (instead of bootstrapping again) in order for the job to be deterministic (as you suggest).
Thus there could be a check which either selects the bootstrap mode or the changelog restore
mode (depending on whether a changelog exists).  I'm not sure if this check exists today (I
would guess no).

2) Deterministic changelog:
You're right - there could be a (smallish) window where we re-process some of the input records
on a container restart. This can happen since the changelog can be (a little ahead) of the
last checkpoint for a given input stream. However, I would argue the changelog is still deterministic
in this case. Since currently Samza only guarantees at-least-once semantics, this seems to
be OK. 

3) Deterministic MessageChooser:
The in-order problem could be avoided, if we restore the state from its changelog - which
was originally populated by a 'bootstrap stream'. The task can then just pick up from where
it left off (making the system deterministic). Having said that, there might be value in writing
an 'EarliestFirstChooser'.

Again, this is just my perception (which could be wrong - I'm still learning). 
C

________________________________________
From: Roger Hoover [roger.hoover@gmail.com]
Sent: Tuesday, September 02, 2014 8:52 AM
To: dev@samza.incubator.apache.org
Subject: Trying to achieve deterministic behavior on recovery/rewind

Hi Samza devs,

I think this project has the best documentation I've even seen!  Amazing
job.  It's extremely well written and Hello Samza is a really great example
that I was able to run + modify without issue.  It was a joy reading the
docs and playing around with example.  Kudos!

After thoroughly reading all the docs, I still have a few questions and
would appreciate any feedback.

I was thinking about how to support deterministic behavior on recovery or
rewind.  Maybe it can't always be 100% deterministic but I think we can get
close.  Have other people thought about this?  Is it desirable?

For example, let's say we're joining two streams: orders and user_info.  As
orders come in, we use the user_id field of the order to lookup additional
information about the user and enrich the stream.  Say we're keeping all
the user_info state in the local KV store.

t1: User updates her email to "foo1@bar.com"
t2: User buys a pair of jeans (product_id == 99)
t3: User updates her email to "foo2@bar.com"

In the case of normal operation (no delays in the user_info stream), the
enriched record will be:

{product_id: 99, email: "foo1@bar.com", ...}

But say that our job fails before it can checkpoint and is configured to
bootstrap from user_info.  When it gets restarted and bootstraps from the
user_info stream, it will end up with the email set to "foo2@bar.com" in
the local KV store.  Then it will reprocess the order event and produce the
"wrong" output:

{product_id: 99, email: "foo2@bar.com", ...}

I haven't verified that but the documentation says "a bootstrap stream
waits for the consumer to explicitly confirm that the stream has been fully
consumed."  Shouldn't it wait until it's consumed up the the checkpoint
offset for the bootsrap stream instead (when there is saved checkpoint
offset)?

Likewise, for local state replicated in the change log.  During the
checkpoint process, Samza could include it's producer offset in the
checkpoint data so that during recovery, the local state will be restored
to a state that corresponds with it's offsets for the input streams.
 Everything would be coherent rather than having the input streams restored
to checkpoint and local state restored to most recent value.  I'm assuming
that change log commits for local state and checkpoint are done together in
an atomic transaction so that they may not always match.

The other missing piece is a nearly deterministic MessageChooser.  During
recovery + rewind, all the messages in both streams are already present in
Kafka and we want a way to replay them in the same order as if they were
played in real-time.  The only way to approximate this behavior that I can
see is to use Kafka broker timestamps for each message.  Is it possible to
write an "EarliestFirstChooser" that always chooses the oldest message
available according to the timestamp it was received by the Kafka broker?

I don't know if Kafka stores a timestamp with each message but I'm assuming
it does because it supports an API on the simple consumer called
getOffsetsBefore() that would seem to map from timestamps to offsets.

Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data has the
version as {"samza-version": "0.0.1"}.  Is this intentional?

If it makes sense, I can put in some JIRA tickets for this stuff...

Cheers,

Roger

Mime
View raw message