samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Kleppmann <>
Subject Re: Determinism and exactly-once semantics
Date Tue, 30 Sep 2014 17:48:14 GMT
Hi Ben,

Thanks for your thoughts! A few quick replies:

On 29 Sep 2014, at 20:41, Ben Kirwin <> wrote:
> Some context: I've been experimenting with the design a high-level
> dataflowy streaming framework, built around Kafka's partitioned logs. I've
> been looking at using Samza as a backend for this, where the dataflow graph
> gets compiled down to a set of Samza jobs -- so if my priorities seem
> strange, it's because I'm motivated by this particular use-case.

That sounds fun. Would love to hear more about your framework when you're ready to share.

> For example, I sometimes rely on the idea of a producer 'owning' a
> partition. This is a similar idea to the way a consumer might be the only
> one in a group to read from a partition: if a producer owns a partition, it
> means it's the only producer *anywhere* that ever writes to it. (For
> example, a Samza task owns its checkpoint stream.)

In we discussed the idea of "write locks"
on streams, which could be used to enforce such exclusive ownership. Nothing implemented yet,

> Assuming (for a moment) that our tasks are deterministic, there's a couple
> strategies that give you exactly-once behaviour:
> If a task owns the partition it's writing to, this behaviour is pretty easy
> to get. Periodically, as the task is running, we can 'checkpoint' the
> offset of the last message we wrote *out*, along with the input offset.

This is quite like what we had in mind with Idempotent Producer (before the idea of idempotent
producer got generalised into transactions in Kafka).

Our tendency has been that if Kafka is providing some facility, it would be better for Samza
to use that facility, rather than to implement its own version of the same. Of course that
runs counter to wanting to support other message brokers, and also we're deviating from that
principle in some places (even when Kafka adds support for tracking consumer offsets, Samza
will probably keep its own checkpointing implementation).

That's just saying: it's not obvious whether Samza should be implementing something like idempotency,
or whether it's best left to the messaging layer. There are good arguments in either direction.

> To make this work without upstream Kafka changes, you need to:
> - Stick the pid / sequence number info in the message itself
> - Do the deduplication on the consumer side
> Notice that, if you want a clean topic with no duplicates or metadata, you
> can follow this with a second job that uses the first strategy to dedup it
> and write the results into a new topic.

This is an interesting idea. Need to think more about it.

> The easiest way to extend this to nondeterministic streams is to just make
> those streams deterministic: you just need to log all your 'choices' before
> making them visible to the rest of the system, so you can replay them if
> something goes wrong. For example, if the nondeterminism is from the
> message arrival order, it's enough to take Chris Riccomini's suggestion and
> log whenever the MessageChooser chooses a stream. In the worst case -- say,
> if you have some nondeterministic code that sends messages and mutates
> state -- you need to capture all output and state changes, and make sure
> they're all logged together before continuing.

I don't quite follow. What would that logging look like (assuming it involves requests to
external systems like a remote database)? Would all external communication have to go through
some kind of logging layer?

Nondeterminism can also come from stuff like using the system clock, random number generator,
order of iteration in a hash table, etc. You can say "just don't do that" (and indeed some
frameworks do), but it's hard to enforce, and it would be hard for users to understand why
a single Math.random() throws off the correctness of their entire job. Given that the intention
of exactly-once semantics is to simplify the programming abstraction, I'm not totally convinced
that's a win.

> For me, I think the only thing missing right now is a little more control
> over the incoming messages. For example, the first exactly-once strategy I
> outlined above needed to bootstrap itself from a stream of (input offset,
> output offset) pairs, and then take that most recent input offset and start
> consuming from *there* -- instead of whatever's in Samza's checkpoint
> stream.

Could you perhaps do this by implementing a CheckpointManager (steal the implementation from
the Kafka checkpoint manager, but change it to suit your needs)?

> On the other hand, I'm pretty sure a lower-level mechanism that allowed you
> to 'pull' or 'request' particular input could support both my unusual
> requirements and the existing checkpoint / message chooser / bootstrap
> stream machinery on top.

Something like this was requested on -- if
that is what you're looking for, could you explain your use case on that issue please? It's
always possible to add features if there's a compelling use case, we just want to make sure
they are well thought out.


View raw message