samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com.INVALID>
Subject Re: Trying to achieve deterministic behavior on recovery/rewind
Date Tue, 02 Sep 2014 18:00:09 GMT
Hey Roger,

Thanks for the feedback on the docs. Based on your questions, I can tell
that you have been thorough in your reading! I'm going to try and answer
your questions as best I can, but the truth is, we haven't (yet) thought a
lot about how to implement time ordering in Samza, so think critically
about what I say.

As I understand your first question about determinism, you are defining
determinism as, "The output will always been exactly the same, even in the
case of failure." This is certainly the most desirable deterministic
behavior one could hope for.

The way we've been defining determinism in Samza is that there are many
potential "correct" outputs at any given point in time. In our definition,
it is possible to send one correct output during processing, then fail,
re-process, and pick a second correct (but different) correct output. The
key is not to materialize two "correct outputs", since this would result
in duplication: downstream consumers should only ever see one of the
correct answers.

In the example you provide, we would say that there are two correct
answers:

{product_id: 99, email: "foo1@bar.com", Š}


Or:

{product_id: 99, email: "foo2@bar.com", Š}


It would not be correct to see both in the output stream, though. The
reason we think that it is "correct" to send foo2@bar.com as the final
output is because it is possible that this could have been the output
before the failure occurred. The reason that I say this is that the timing
between the two streams (orders and user_info) is undefined. You've
defined that the consumer read the messages in this order:

t1: User updates her email to "foo1@bar.com"
t2: User buys a pair of jeans (product_id == 99)
t3: User updates her email to "foo2@bar.com"


But it's just as possible that the consumer could have read the messages
in this order:

t1: User updates her email to "foo1@bar.com"
t2: User updates her email to "foo2@bar.com"
t3: User buys a pair of jeans (product_id == 99)


Samza (or Kafka) could introduce some concept of time to fix this. You
could come up with a deterministic MessageChooser that picks messages
based on a timestamp *in* the message. The two options on who assigns this
timestamp (that I can think of) are the producer or the broker. The broker
seems more desirable since it will apply a totally ordered timestamp
across all messages in a single partition, while having the producer
assign timestamp can cause messages to be written to Kafka out of order
(since each producer will have a different timestamp).

Let's assume that the broker assigns it on a per-partition basis. Let's
also assume that in your example, the user_info partition is on a
different broker from the orders partition, and that each stream is just
one partition. In this case, broker1 assigns time t1 and t3 to the
user_info stream messages, and broker2 assigns time t2 to the orders
stream message. The problem is that the consumer could have read the
messages for t1 and t3, but not yet for t2, since they're from different
brokers. In such a case, what is the desirable behavior for the
MessageChooser? As I see it, there are two choices: block forever until a
message from the orders stream comes in, or time-bound the decision, and
move forward without a message from orders after some time bound. If you
take the former approach, you can introduce a lot of latency on the
user_info stream, but I think you're right: you'll always process messages
in exactly the same order. I think the same holds true for the change log.

Seems to me like this is worth opening a JIRA to discuss.

>>> I'm assuming that change log commits for local state and checkpoint
>>>are done together in an atomic transaction so that they may not always
>>>match.


This will be true in the future. Kafka currently does not support atomic
transactions, but there is a JIRA/design/doc/patch up. Once this feature
is committed, we will use it as you describe.

>>> Is it possible to write an "EarliestFirstChooser" that always chooses
>>>the oldest message available according to the timestamp it was received
>>>by the Kafka broker?

Yes, this should be possible.

>>> I don't know if Kafka stores a timestamp with each message but I'm
>>>assuming it does because it supports an API on the simple consumer
>>>called getOffsetsBefore() that would seem to map from timestamps to
>>>offsets.

It doesn't keep track of anything like wall-clock time on a per-message
basis. Each message does have an "offset", which is a per-partition
logical measure of how far into a stream the message is. It starts at 0,
and increments once for each message written. I *think* that this is of
little use for ordering, since you could have the offset for the orders
partition be 1000000, while the offset for user_info could be 130.

>>> I'm using Samza 0.7.0 but the metrics data has the version as
>>>{"samza-version": "0.0.1"}.  Is this intentional?

I think this is a result of the Samza jars not having the proper
META/version info in it. As I recall, the MetricsSnapshotReporter looks
for the version info somewhere in the META-INF folder inside the Samza
jar. If it doesn't find it, it defaults to 0.0.1. That happens here:

  
https://github.com/apache/incubator-samza/blob/master/samza-core/src/main/s
cala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala


Cheers,
Chris

On 9/2/14 8:52 AM, "Roger Hoover" <roger.hoover@gmail.com> wrote:

>Hi Samza devs,
>
>I think this project has the best documentation I've even seen!  Amazing
>job.  It's extremely well written and Hello Samza is a really great
>example
>that I was able to run + modify without issue.  It was a joy reading the
>docs and playing around with example.  Kudos!
>
>After thoroughly reading all the docs, I still have a few questions and
>would appreciate any feedback.
>
>I was thinking about how to support deterministic behavior on recovery or
>rewind.  Maybe it can't always be 100% deterministic but I think we can
>get
>close.  Have other people thought about this?  Is it desirable?
>
>For example, let's say we're joining two streams: orders and user_info.
>As
>orders come in, we use the user_id field of the order to lookup additional
>information about the user and enrich the stream.  Say we're keeping all
>the user_info state in the local KV store.
>
>t1: User updates her email to "foo1@bar.com"
>t2: User buys a pair of jeans (product_id == 99)
>t3: User updates her email to "foo2@bar.com"
>
>In the case of normal operation (no delays in the user_info stream), the
>enriched record will be:
>
>{product_id: 99, email: "foo1@bar.com", ...}
>
>But say that our job fails before it can checkpoint and is configured to
>bootstrap from user_info.  When it gets restarted and bootstraps from the
>user_info stream, it will end up with the email set to "foo2@bar.com" in
>the local KV store.  Then it will reprocess the order event and produce
>the
>"wrong" output:
>
>{product_id: 99, email: "foo2@bar.com", ...}
>
>I haven't verified that but the documentation says "a bootstrap stream
>waits for the consumer to explicitly confirm that the stream has been
>fully
>consumed."  Shouldn't it wait until it's consumed up the the checkpoint
>offset for the bootsrap stream instead (when there is saved checkpoint
>offset)?
>
>Likewise, for local state replicated in the change log.  During the
>checkpoint process, Samza could include it's producer offset in the
>checkpoint data so that during recovery, the local state will be restored
>to a state that corresponds with it's offsets for the input streams.
> Everything would be coherent rather than having the input streams
>restored
>to checkpoint and local state restored to most recent value.  I'm assuming
>that change log commits for local state and checkpoint are done together
>in
>an atomic transaction so that they may not always match.
>
>The other missing piece is a nearly deterministic MessageChooser.  During
>recovery + rewind, all the messages in both streams are already present in
>Kafka and we want a way to replay them in the same order as if they were
>played in real-time.  The only way to approximate this behavior that I can
>see is to use Kafka broker timestamps for each message.  Is it possible to
>write an "EarliestFirstChooser" that always chooses the oldest message
>available according to the timestamp it was received by the Kafka broker?
>
>I don't know if Kafka stores a timestamp with each message but I'm
>assuming
>it does because it supports an API on the simple consumer called
>getOffsetsBefore() that would seem to map from timestamps to offsets.
>
>Finally, a nit pick.  I'm using Samza 0.7.0 but the metrics data has the
>version as {"samza-version": "0.0.1"}.  Is this intentional?
>
>If it makes sense, I can put in some JIRA tickets for this stuff...
>
>Cheers,
>
>Roger


Mime
View raw message