samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Kirwin <>
Subject Re: Inter-stream time-alignment
Date Sat, 06 Dec 2014 00:54:04 GMT
> In order to visit each input in the correct order, the event-type-specific
> streams need to be interleaved/time-aligned for each summarizer Task based
> on embedded timestamps. However, Samza offers no inter-stream ordering
> semantics, so this ordering would seem to be the job of a custom
> MessageChooser. But I don't see how a MessageChooser can do this without
> one additional bit of context: when we haven't been offered an item from
> one or more input streams, we need to know whether the missing stream(s)
> are "at head," to decide whether we must await an upcoming message for
> timestamp comparison.

Unless I misunderstand your design, I don't think knowing whether the
stream is 'at head' actually helps you here.

Let's say the upstream task is loading data into two topics:
'level-changed' and 'purchase-completed'. Suppose a Kafka node goes
down while it's producing a new batch of messages, and the
'purchase-completed' events in that batch are written, but the
'level-changed' events are not. The downstream Samza task will see /
process those 'purchase-completed' events, but there's no way for it
to know that it should expect some preceding 'level-changed' events --
since they never made it into Kafka, Samza think's it's 'caught up'.
(There are some other race-type problems you can get, but I think
that's the most obvious.)

Normally, I suggest that whenever you care about the relative ordering
of some data, you try and put that data in the same partition of the
same topic. When messages are in the same partition, the ordering's
obvious -- but as you've noticed, it gets a lot trickier to
re-establish order when Kafka doesn't enforce it. In your case, it
seems like partitioning by shard id should work; consumers can just
filter out the messages they don't care about.

In your case, another option is to add periodic 'marker' messages with
the current timestamp to each topic/partition. When your samza job
gets a marker as input, it can be sure that the upstream job will
never send an event with a smaller timestamp on that partition. When
your task sees a 'purchase-completed' event, it just needs to buffer
until it sees either a 'level-changed' event or a marker with a >=
timestamp -- and then it can be confident it knows the player's level
at that moment in time. (Still, I suggest the first option if you can
swing it -- it's a whole lot harder to mess up.)

Ben Kirwin

View raw message