samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <>
Subject Re: Coast, and a few implementation questions
Date Mon, 08 Dec 2014 20:37:16 GMT
Hey Ben,

This is awesome. Your blog post is really well written. I'm forwarding it
on to some folks.

My notes:

1. Please, please, open tickets for anything you need. Your pain on the
offset management is something I've been mulling over. I don't have a
really good idea on how to solve this, but I agree we should figure it out.
2. "But if our task is the only producer for that partition, it¹s easy to
calculate the value of nŠ and by starting from the commit, and then
dropping the first n messages of output we create, we can get ourselves
back to a consistent state." This is true only in cases where the
processing is deterministic. There are a shocking amount of cases where
this is not true, including some which are very subtle. If you rely on
wall-clock time, for example, your logic will change between processing,
so simply dropping N messages could lead to data loss or duplication.
Other, more nuanced, non-determinism includes deploying a different
(newer) version of code, dependency on external systems, etc.
3. You might want to consider the case where a downstream consumer is
reading from a changelog, as well. This is a use case we've seen pop up
once in a while.
4. One variation of merge log that we'd considered was to batch reads from
an SSP together. This would allow you to shrink your merge log, so you can
say, "Read 100 messages from SSP1, 100 messages from SSP2, etc." This
allows you to drastically shrink the merge log's message count, which
should improve performance. Samza's DefaultMessageChooser supports this
style of batching, but it's off by default.
5. You should have a look at Kafka's transactionality proposal, as well
in+Kafka). This is the only way that I'm aware of that will support all
use cases (including non-deterministic ones).


On 11/28/14 9:47 AM, "Ben Kirwin" <> wrote:

>Sorry for the late reply on this!
>I ended up writing up some of the tools I'm using to get exactly-once
>semantics.[0] It's still a bit rough, but I decided that another week
>sitting around on my disk wasn't going to do anything to improve it --
>let me know if anything needs clarification.
>That's also not a full answer, since it doesn't explain how those
>primitives are used in `coast`. It's intentionally undocumented for
>the moment, since it's a work in progress, but here's where the
>coordination overhead sits right now:
>- Any transformations / folds / aggregations that require state use
>Samza's existing state / changelog machinery. `coast` tracks a couple
>offsets up/downstream from the state along with the state itself, so
>it adds a extra couple of `long`s per changelog message.
>- If messages are grouped differently in the input and output, `coast`
>adds the source partition/offset to the output messages -- which is
>about 12 bytes of per-message overhead.
>- To avoid duplicate messages, `coast` needs to checkpoint the input
>and output offsets together. Samza doesn't give user code access to
>the offsets, so I've been maintaining this within the task as
>additional state.
>- For jobs with multiple inputs, `coast` needs to remember the order
>in which messages arrived so it can reproduce it if there's a failure.
>This 'merge log' itself is not too expensive, but tracking the current
>offset in that log has been a surprising pain, since it too needs to
>be consistent with the checkpointed offsets. The only way I've found
>so far involves having the same task both produce to and consume from
>the same 'merge log' stream; this is not quite as awful as it sounds,
>but it does create a lot of latency for no good reason.
>So there's a couple places where more flexible offset handling would
>substantially simplify things -- I'll think this over a bit more and
>open a ticket. If I can work those issues out, though, the main cost
>to enabling exactly-once becomes a few extra bytes of per-message
>overhead for certain streams -- and I'm happy to live with that.
>As it stands, though, `coast` *does* implement a exactly-once
>semantics on top of the 0.8 Samza branch. I have some more cleanup,
>testing, and polish to do, but I'm hoping to push out a new version of
>`coast` that supports both the exactly-once backend and an
>overhead-free at-least-once version. If all goes well, that should
>happen sometime next week?
>On Fri, Nov 7, 2014 at 1:05 PM, Ben Kirwin <> wrote:
>>>> but I think coast actually has a pretty good shot at making that
>>>>-- it has quite a lot of 'structural' knowledge about the flow of data,
>>>>so it should be able to do a pretty good job of inserting the necessary
>>>>checks / checkpoints / etc. one DAG node at a time.
>>> True. Given that you know exactly what computation is going on, it
>>> more tractable. I'm curious how you plan to implement exactly once. Do
>>> have any docs?
>> Nothing worth reading, but I'm hoping to have something reasonable out
>> in the next week or two. When that comes together, I'll send it along.
>Ben Kirwin

View raw message