samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From josh gruenberg <>
Subject Re: Inter-stream time-alignment
Date Mon, 08 Dec 2014 20:50:29 GMT
Hi guys,

Thanks for the quick replies, and for your attention to the correctness
beyond my specific questions!

After a weekend of reflection, I can now see just how much I DIDN'T
understand before about time and order in stream-processing :-).

You're right to scrutinize the ordering guarantees given multiple producers
fanning out the upstream partitions. This has me reconsidering my entire
topology. My idea as of Friday morning was to generate a topic in kafka
containing input-file URIs, partitioned by upstream shard-id, and to then
have a Task explode the files into type-collated topics, partitioned by
user-id. I was mistakenly imagining that, because the upstream shards are
already partitioned by user-id, this would somehow result in the upstream
partitioning/ordering being preserved as long as downstream partitions were
also based on user-id. I suspect now that this would only happen if the
partitioning formula is identical (murmur3, anyone? ;-), and the number of
partitions is also consistent end-to-end! Does that sound right to you?
Anyway, I believe my initial design was flawed, as you suspected.

Ben's suggestion that we instead partition by upstream shard-id is
intriguing... But I'm confused about this: I can declare the shard-id as
the partitionKey with each Message, but will this guarantee that no two
shards will ever be MERGED by the partitioner? At the very least, I'd
expect that we'd need to ensure that the cardinality of the partitions
matched the upstream shard-count end-to-end... If not (ie, if there are
fewer partitions), I think we'd still end up with out-of-order events in
any partitions that merge multiple upstream shards, unless we take steps to
sort them. Is this accurate?

Now, just to stir the pot: the upstream OLTP database is heavily bucketed
to achieve fine-grained distribution: my source has *1024* logical shards!
I think this might need to be stepped down with an initial merge process...

Then, as per my original question, the need to merge-sort just reappears
whenever we want to consume multiple streams, and I don't see a way to
avoid injecting (and propagating!) timestamp-heartbeats to maintain global
order through a merge without blocking for "drained" streams. Moreover, if
we want to alter the partitioning downstream, we'll need to do a more
complicated 2-step shuffle: first merging upstream partitions together,
then buffering multiple messages until we have data from all upstream
partitions AND streams to determine what's next..? (This thinking is
half-baked, please correct me if I'm confused.)

Similarly, sharing stateful Stores outside of their generating Task (for
example, to join a user's current "level" with multiple event-types/Tasks)
seems troubling, as I'm not sure how to coordinate timestamps with the
Store's changelog for this.

Overwhelming. Am I overthinking this? Do you think I can get this working
without implementing full heartbeat-plumbing and custom
buffering/merge-sorting for every Task? I'm not seeing a robust

Thanks for your insights; I appreciate any suggestions you can offer!


On Fri, Dec 5, 2014 at 5:12 PM, Chris Riccomini <> wrote:

> Hey Ben,
> This is a good point. I was thinking more about this as well. Another
> example is when a message is sent but not yet replicated to the rest of
> the ISR in Kafka. In such a case, the Samza job would think it's at head
> (since the last message it received == the high watermark), but there are
> still messages in-flight that might be < the timestamp for the stream
> you're joining against.
> > In your case, it seems like partitioning by shard id should work
> Yea, this is an interesting idea. This should work provided that the join
> is happening within the same shard (in the example given, that was the
> case), and there is only one producer for the shard, which is properly
> interleaving the messages from the two strictly ordered files (assuming
> the files are one per-table).
> Cheers,
> Chris
> On 12/5/14 4:54 PM, "Ben Kirwin" <> wrote:
> >> In order to visit each input in the correct order, the
> >>event-type-specific
> >> streams need to be interleaved/time-aligned for each summarizer Task
> >>based
> >> on embedded timestamps. However, Samza offers no inter-stream ordering
> >> semantics, so this ordering would seem to be the job of a custom
> >> MessageChooser. But I don't see how a MessageChooser can do this without
> >> one additional bit of context: when we haven't been offered an item from
> >> one or more input streams, we need to know whether the missing stream(s)
> >> are "at head," to decide whether we must await an upcoming message for
> >> timestamp comparison.
> >
> >Unless I misunderstand your design, I don't think knowing whether the
> >stream is 'at head' actually helps you here.
> >
> >Let's say the upstream task is loading data into two topics:
> >'level-changed' and 'purchase-completed'. Suppose a Kafka node goes
> >down while it's producing a new batch of messages, and the
> >'purchase-completed' events in that batch are written, but the
> >'level-changed' events are not. The downstream Samza task will see /
> >process those 'purchase-completed' events, but there's no way for it
> >to know that it should expect some preceding 'level-changed' events --
> >since they never made it into Kafka, Samza think's it's 'caught up'.
> >(There are some other race-type problems you can get, but I think
> >that's the most obvious.)
> >
> >Normally, I suggest that whenever you care about the relative ordering
> >of some data, you try and put that data in the same partition of the
> >same topic. When messages are in the same partition, the ordering's
> >obvious -- but as you've noticed, it gets a lot trickier to
> >re-establish order when Kafka doesn't enforce it. In your case, it
> >seems like partitioning by shard id should work; consumers can just
> >filter out the messages they don't care about.
> >
> >In your case, another option is to add periodic 'marker' messages with
> >the current timestamp to each topic/partition. When your samza job
> >gets a marker as input, it can be sure that the upstream job will
> >never send an event with a smaller timestamp on that partition. When
> >your task sees a 'purchase-completed' event, it just needs to buffer
> >until it sees either a 'level-changed' event or a marker with a >=
> >timestamp -- and then it can be confident it knows the player's level
> >at that moment in time. (Still, I suggest the first option if you can
> >swing it -- it's a whole lot harder to mess up.)
> >
> >--
> >Ben Kirwin
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message