samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Kirwin <>
Subject Determinism and exactly-once semantics
Date Mon, 29 Sep 2014 19:41:24 GMT
Hello all,

I just came across the recent discussion on 'Trying to achieve
deterministic behavior on recovery/rewind'. I've been thinking about this
myself quite a bit recently, so I thought I'd pass along a few observations
in case anyone else finds them useful. (I've just joined the mailing list,
so I can't reply to that conversation directly -- sorry! Apologies as well
if this isn't the appropriate spot for this.)

Jay Kreps mentioned a few extensions to Kafka that should support an
exactly-once semantics. In many circumstances, it's *also* possible to do
this entirely in-band, without making more assumptions about the underlying
transport. (In other words, they can be implemented on top of the current
Kafka release.) I think, but am not certain, that Samza would need some
minimal changes to expose this to the user.

Some context: I've been experimenting with the design a high-level
dataflowy streaming framework, built around Kafka's partitioned logs. I've
been looking at using Samza as a backend for this, where the dataflow graph
gets compiled down to a set of Samza jobs -- so if my priorities seem
strange, it's because I'm motivated by this particular use-case.

For example, I sometimes rely on the idea of a producer 'owning' a
partition. This is a similar idea to the way a consumer might be the only
one in a group to read from a partition: if a producer owns a partition, it
means it's the only producer *anywhere* that ever writes to it. (For
example, a Samza task owns its checkpoint stream.) This turns out to
simplify a lot of reasoning, and it's often not too hard to arrange if
you're looking out for it, but a lot of jobs in the wild aren't set up this

Assuming (for a moment) that our tasks are deterministic, there's a couple
strategies that give you exactly-once behaviour:

If a task owns the partition it's writing to, this behaviour is pretty easy
to get. Periodically, as the task is running, we can 'checkpoint' the
offset of the last message we wrote *out*, along with the input offset. If
the task fails, we can just look under the output partition and see what
the last offset written was, then rewind until the most recent checkpoint
before that and continue from there. If the last offset in our output
partition is `n`, and the last output offset in our checkpoint is `m`, we
know that the next `n-m` messages are duplicates -- so we can just drop
them entirely.

Even if our task doesn't own its output partition, you can still get
exactly-once semantics with a variant on the 'Idempotent Producer' proposal
from here:

To make this work without upstream Kafka changes, you need to:
- Stick the pid / sequence number info in the message itself
- Do the deduplication on the consumer side

Notice that, if you want a clean topic with no duplicates or metadata, you
can follow this with a second job that uses the first strategy to dedup it
and write the results into a new topic.

The easiest way to extend this to nondeterministic streams is to just make
those streams deterministic: you just need to log all your 'choices' before
making them visible to the rest of the system, so you can replay them if
something goes wrong. For example, if the nondeterminism is from the
message arrival order, it's enough to take Chris Riccomini's suggestion and
log whenever the MessageChooser chooses a stream. In the worst case -- say,
if you have some nondeterministic code that sends messages and mutates
state -- you need to capture all output and state changes, and make sure
they're all logged together before continuing. Compared to the
'Transactional Messaging' proposal for Kafka, I think this: has similar
latency and correctness guarantees, is equally amenable to batching if
necessary, sends more data over the wire, requires no new extensions to the
messaging layer, and is 'pay-what-you-use' -- in the sense that if only a
part of your task is nondeterministic, you only have to capture that
particular bit in the log.

I understand that many people are not going to care about this kind of
fine-grained control, and that a switch to flip that says 'make this
arbitrary code I wrote deterministic' is very useful. But it would be cool
if the kind of control I'm talking about were possible with Samza, both for
people like me and those folks who are willing to work a little harder to
get low latency without sacrificing correctness.

For me, I think the only thing missing right now is a little more control
over the incoming messages. For example, the first exactly-once strategy I
outlined above needed to bootstrap itself from a stream of (input offset,
output offset) pairs, and then take that most recent input offset and start
consuming from *there* -- instead of whatever's in Samza's checkpoint
stream. I can't find a way to do this in Samza with acceptable performance.
On the other hand, I'm pretty sure a lower-level mechanism that allowed you
to 'pull' or 'request' particular input could support both my unusual
requirements and the existing checkpoint / message chooser / bootstrap
stream machinery on top.

I'd be happy to flesh any of this out a bit more, if anyone is interested.
If I'm misunderstanding something, I'd also be very happy for any
corrections, etc.

Oh, also: I'm very impressed with all the work that's gone into Samza, and
how seriously it takes fault tolerance and state management -- thanks!

-- Ben

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message