samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Tue, 02 Sep 2014 18:14:18 GMT
Hey Roger,

Another thing that we thought of a while ago is that an alternative would
be to have the MessageChooser's choices get logged. So if it chose M1, M2,
M3, M4, Samza would always re-process the messages in that order. There is
some expense to this approach since you'd have to log the choices, but the
individual choices could be batched together to increase throughput. If
these choose are also committed atomically along with checkpoints, change
log, and output, I think you'd also get deterministic output.

This approach wouldn't rely on a time ordering either, and could even
support non-deterministic MessageChoosers (e.g. Using random.nextInt() <
message.hashCode() or something crazy like that).


On 9/2/14 11:00 AM, "Chris Riccomini" <> wrote:

>Hey Roger,
>Thanks for the feedback on the docs. Based on your questions, I can tell
>that you have been thorough in your reading! I'm going to try and answer
>your questions as best I can, but the truth is, we haven't (yet) thought a
>lot about how to implement time ordering in Samza, so think critically
>about what I say.
>As I understand your first question about determinism, you are defining
>determinism as, "The output will always been exactly the same, even in the
>case of failure." This is certainly the most desirable deterministic
>behavior one could hope for.
>The way we've been defining determinism in Samza is that there are many
>potential "correct" outputs at any given point in time. In our definition,
>it is possible to send one correct output during processing, then fail,
>re-process, and pick a second correct (but different) correct output. The
>key is not to materialize two "correct outputs", since this would result
>in duplication: downstream consumers should only ever see one of the
>correct answers.
>In the example you provide, we would say that there are two correct
>{product_id: 99, email: "", Š}
>{product_id: 99, email: "", Š}
>It would not be correct to see both in the output stream, though. The
>reason we think that it is "correct" to send as the final
>output is because it is possible that this could have been the output
>before the failure occurred. The reason that I say this is that the timing
>between the two streams (orders and user_info) is undefined. You've
>defined that the consumer read the messages in this order:
>t1: User updates her email to ""
>t2: User buys a pair of jeans (product_id == 99)
>t3: User updates her email to ""
>But it's just as possible that the consumer could have read the messages
>in this order:
>t1: User updates her email to ""
>t2: User updates her email to ""
>t3: User buys a pair of jeans (product_id == 99)
>Samza (or Kafka) could introduce some concept of time to fix this. You
>could come up with a deterministic MessageChooser that picks messages
>based on a timestamp *in* the message. The two options on who assigns this
>timestamp (that I can think of) are the producer or the broker. The broker
>seems more desirable since it will apply a totally ordered timestamp
>across all messages in a single partition, while having the producer
>assign timestamp can cause messages to be written to Kafka out of order
>(since each producer will have a different timestamp).
>Let's assume that the broker assigns it on a per-partition basis. Let's
>also assume that in your example, the user_info partition is on a
>different broker from the orders partition, and that each stream is just
>one partition. In this case, broker1 assigns time t1 and t3 to the
>user_info stream messages, and broker2 assigns time t2 to the orders
>stream message. The problem is that the consumer could have read the
>messages for t1 and t3, but not yet for t2, since they're from different
>brokers. In such a case, what is the desirable behavior for the
>MessageChooser? As I see it, there are two choices: block forever until a
>message from the orders stream comes in, or time-bound the decision, and
>move forward without a message from orders after some time bound. If you
>take the former approach, you can introduce a lot of latency on the
>user_info stream, but I think you're right: you'll always process messages
>in exactly the same order. I think the same holds true for the change log.
>Seems to me like this is worth opening a JIRA to discuss.
>>>> I'm assuming that change log commits for local state and checkpoint
>>>>are done together in an atomic transaction so that they may not always
>This will be true in the future. Kafka currently does not support atomic
>transactions, but there is a JIRA/design/doc/patch up. Once this feature
>is committed, we will use it as you describe.
>>>> Is it possible to write an "EarliestFirstChooser" that always chooses
>>>>the oldest message available according to the timestamp it was received
>>>>by the Kafka broker?
>Yes, this should be possible.
>>>> I don't know if Kafka stores a timestamp with each message but I'm
>>>>assuming it does because it supports an API on the simple consumer
>>>>called getOffsetsBefore() that would seem to map from timestamps to
>It doesn't keep track of anything like wall-clock time on a per-message
>basis. Each message does have an "offset", which is a per-partition
>logical measure of how far into a stream the message is. It starts at 0,
>and increments once for each message written. I *think* that this is of
>little use for ordering, since you could have the offset for the orders
>partition be 1000000, while the offset for user_info could be 130.
>>>> I'm using Samza 0.7.0 but the metrics data has the version as
>>>>{"samza-version": "0.0.1"}.  Is this intentional?
>I think this is a result of the Samza jars not having the proper
>META/version info in it. As I recall, the MetricsSnapshotReporter looks
>for the version info somewhere in the META-INF folder inside the Samza
>jar. If it doesn't find it, it defaults to 0.0.1. That happens here:
>On 9/2/14 8:52 AM, "Roger Hoover" <> wrote:
>>Hi Samza devs,
>>I think this project has the best documentation I've even seen!  Amazing
>>job.  It's extremely well written and Hello Samza is a really great
>>that I was able to run + modify without issue.  It was a joy reading the
>>docs and playing around with example.  Kudos!
>>After thoroughly reading all the docs, I still have a few questions and
>>would appreciate any feedback.
>>I was thinking about how to support deterministic behavior on recovery or
>>rewind.  Maybe it can't always be 100% deterministic but I think we can
>>close.  Have other people thought about this?  Is it desirable?
>>For example, let's say we're joining two streams: orders and user_info.
>>orders come in, we use the user_id field of the order to lookup
>>information about the user and enrich the stream.  Say we're keeping all
>>the user_info state in the local KV store.
>>t1: User updates her email to ""
>>t2: User buys a pair of jeans (product_id == 99)
>>t3: User updates her email to ""
>>In the case of normal operation (no delays in the user_info stream), the
>>enriched record will be:
>>{product_id: 99, email: "", ...}
>>But say that our job fails before it can checkpoint and is configured to
>>bootstrap from user_info.  When it gets restarted and bootstraps from the
>>user_info stream, it will end up with the email set to "" in
>>the local KV store.  Then it will reprocess the order event and produce
>>"wrong" output:
>>{product_id: 99, email: "", ...}
>>I haven't verified that but the documentation says "a bootstrap stream
>>waits for the consumer to explicitly confirm that the stream has been
>>consumed."  Shouldn't it wait until it's consumed up the the checkpoint
>>offset for the bootsrap stream instead (when there is saved checkpoint
>>Likewise, for local state replicated in the change log.  During the
>>checkpoint process, Samza could include it's producer offset in the
>>checkpoint data so that during recovery, the local state will be restored
>>to a state that corresponds with it's offsets for the input streams.
>> Everything would be coherent rather than having the input streams
>>to checkpoint and local state restored to most recent value.  I'm
>>that change log commits for local state and checkpoint are done together
>>an atomic transaction so that they may not always match.
>>The other missing piece is a nearly deterministic MessageChooser.  During
>>recovery + rewind, all the messages in both streams are already present
>>Kafka and we want a way to replay them in the same order as if they were
>>played in real-time.  The only way to approximate this behavior that I
>>see is to use Kafka broker timestamps for each message.  Is it possible
>>write an "EarliestFirstChooser" that always chooses the oldest message
>>available according to the timestamp it was received by the Kafka broker?
>>I don't know if Kafka stores a timestamp with each message but I'm
>>it does because it supports an API on the simple consumer called
>>getOffsetsBefore() that would seem to map from timestamps to offsets.
>>Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data has the
>>version as {"samza-version": "0.0.1"}.  Is this intentional?
>>If it makes sense, I can put in some JIRA tickets for this stuff...

View raw message