samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From josh gruenberg <>
Subject Re: Inter-stream time-alignment
Date Thu, 11 Dec 2014 00:57:17 GMT
Thanks, Ben. Your explanations are very clear.

I did follow your earlier suggestion that we simply feed the entire muxed
stream, partitioned by user, to each job, and have them filter for their
events of interest. I do think this could be made to work. I have a couple
reservations about this approach, but I think we can roll with it for now
and tackle more sophisticated solutions as needs arise.

For the record, my reservations regarding all processing consuming the
firehose "muxed" stream are:

   1. the cost/performance of replaying ALL events just to regenerate
   output from a small subset (not sure how significant this is without
   real-world experience), and
   2. the inability to tee off reusable summarization streams for reuse
   between multiple downstream tasks (ie, a DAG of tasks) without tackling the
   multi-stream ordering problem

Given certain assumptions about how the stream-processing works, I think
this could all be addressed via a fairly straightforward
timestamp/heartbeat system that could be wrapped around tasks in a
workflow. I'd consider filing a JIRA ticket for this (if there isn't
already something related), but I don't seem to be able to reach at the moment..?

I have some fairly fleshed-out ideas about such a system might work (for my
use-case, anyway), so I'll dump my thoughts here for community review;
apologies if this flawed, or already well understood/obvious to the

>From the Task perspective, a reusable system for inter-stream alignment
could involve:

   - declaring logical timestamps/seqnos (or *ranges*) for each message
   (possibly via OutgoingMessageEnvelope/MessageCollector, or via an
   application-provided MessageTimestampInterpreter interface)
   - declaring timestamped output-streams, so that the alignment-system can
   generate outbound heartbeats as appropriate

The system might then:

   1. provide a MessageChooser that waits until messages arrive for ALL
   inbound streams prior to choosing the eldest non-heartbeat for processing
   2. update a local notion of "stream-time" whenever processing begins on
   an inbound message, and whenever a message is produced to an output-stream
   3. monitor outbound messages to record the last timestamp produced to
   each stream
   4. whenever the "stream-time" crosses some configurable
   heartbeat-interval, inject a heartbeat message (ie, a message with a
   timestamp but no payload) into all registered downstream topics to which no
   message has been published since the previous interval

(Fragile) assumptions:

   1. partitions of all timestamped streams maintain strict
   2. all outbound messages from a task are produced in monotonic
   3. the timestamp-ranges of all outbound messages fall within the
   timestamp-range of the inbound message that triggered them
   4. stream partitioning is retained/consistent end-to-end

Open questions in my mind:

   1. Initialization of the heartbeats at startup is an area of interest.
   Specifically, I think a property of each timestamped stream might be its
   logical "beginning-of-time" (which could be represented as an up-front
   heartbeat with that timestamp) ... to avoid pointless backfilling of
   timestamps into the past
   2. A solution for handling when streams are re-partitioned (likely
   involves buffering of multiple messages per input-stream?)

Cheers, hope this is helpful,

I don't think it has to be quite so tricky. Some quick notes:

'Preserving order' is a really nice way to think about these problems.
At the beginning of your job, you have an ordered series of files for
each shard, and each file has an ordered series of records. (Here,
I'll assume that everything is sharded by hashing the user id.)
There's a few interesting relationships here:

- If two events for a user are always in the same shard, then all
these events already have a 'total order'. This order is semantically
important, so you want to preserve it all the way through your stream
processing pipeline. The easiest way to preserve the ordering of two
messages in Kafka is to write them to the same topic / partition.
- Ordering isn't always meaningful -- for example, two events for
unrelated users that happened in the same shard are *also* ordered,
but since all these users have in common is that they hashed to the
same shard, you probably don't care. In that case, you're free to
*not* preserve the ordering and write them to different topics /
- For events from totally different shards, there's no 'existing'
ordering, so there's nothing to preserve. Again, you can partition
these however you like.

Partitioning is a lot like sharding -- it's important that all data
for the same user ends up in the same shard/partition, but it's
totally fine if data for a bunch of unrelated users is in there too.

So assuming all that is right, one solution is to put the data for all
event types in a *single* topic, partitioned by user id. Each
partition will have data for a bunch of users, but all the data for a
particular user is in just one partition. You can have as many or as
few partitions as you want. If you do that, analyses like "what level
was this user when they made that purchase?" are trivial; instead of
worrying about joins, you just have your job filter out the events you
don't care about. You'll still have to regroup / etc. for jobs that
aggregate across a bunch of different users, but that's normal: most
users aren't from the same shard, so there was no strict ordering
between them in the first place.

There's a bunch of possible variations on this, but that's the basic
idea I was getting at last week.

On Mon, Dec 8, 2014 at 3:50 PM, josh gruenberg <> wrote:
> Hi guys,
> Thanks for the quick replies, and for your attention to the correctness
> beyond my specific questions!
> After a weekend of reflection, I can now see just how much I DIDN'T
> understand before about time and order in stream-processing :-).
> You're right to scrutinize the ordering guarantees given multiple
> fanning out the upstream partitions. This has me reconsidering my entire
> topology. My idea as of Friday morning was to generate a topic in kafka
> containing input-file URIs, partitioned by upstream shard-id, and to then
> have a Task explode the files into type-collated topics, partitioned by
> user-id. I was mistakenly imagining that, because the upstream shards are
> already partitioned by user-id, this would somehow result in the upstream
> partitioning/ordering being preserved as long as downstream partitions
> also based on user-id. I suspect now that this would only happen if the
> partitioning formula is identical (murmur3, anyone? ;-), and the number of
> partitions is also consistent end-to-end! Does that sound right to you?
> Anyway, I believe my initial design was flawed, as you suspected.
> Ben's suggestion that we instead partition by upstream shard-id is
> intriguing... But I'm confused about this: I can declare the shard-id as
> the partitionKey with each Message, but will this guarantee that no two
> shards will ever be MERGED by the partitioner? At the very least, I'd
> expect that we'd need to ensure that the cardinality of the partitions
> matched the upstream shard-count end-to-end... If not (ie, if there are
> fewer partitions), I think we'd still end up with out-of-order events in
> any partitions that merge multiple upstream shards, unless we take steps
> sort them. Is this accurate?
> Now, just to stir the pot: the upstream OLTP database is heavily bucketed
> to achieve fine-grained distribution: my source has *1024* logical shards!
> I think this might need to be stepped down with an initial merge
> Then, as per my original question, the need to merge-sort just reappears
> whenever we want to consume multiple streams, and I don't see a way to
> avoid injecting (and propagating!) timestamp-heartbeats to maintain global
> order through a merge without blocking for "drained" streams. Moreover, if
> we want to alter the partitioning downstream, we'll need to do a more
> complicated 2-step shuffle: first merging upstream partitions together,
> then buffering multiple messages until we have data from all upstream
> partitions AND streams to determine what's next..? (This thinking is
> half-baked, please correct me if I'm confused.)
> Similarly, sharing stateful Stores outside of their generating Task (for
> example, to join a user's current "level" with multiple event-types/Tasks)
> seems troubling, as I'm not sure how to coordinate timestamps with the
> Store's changelog for this.
> Overwhelming. Am I overthinking this? Do you think I can get this working
> without implementing full heartbeat-plumbing and custom
> buffering/merge-sorting for every Task? I'm not seeing a robust
> alternative...
> Thanks for your insights; I appreciate any suggestions you can offer!
> Cheers,
> -josh
> On Fri, Dec 5, 2014 at 5:12 PM, Chris Riccomini <
>> wrote:
>> Hey Ben,
>> This is a good point. I was thinking more about this as well. Another
>> example is when a message is sent but not yet replicated to the rest of
>> the ISR in Kafka. In such a case, the Samza job would think it's at head
>> (since the last message it received == the high watermark), but there are
>> still messages in-flight that might be < the timestamp for the stream
>> you're joining against.
>> > In your case, it seems like partitioning by shard id should work
>> Yea, this is an interesting idea. This should work provided that the join
>> is happening within the same shard (in the example given, that was the
>> case), and there is only one producer for the shard, which is properly
>> interleaving the messages from the two strictly ordered files (assuming
>> the files are one per-table).
>> Cheers,
>> Chris
>> On 12/5/14 4:54 PM, "Ben Kirwin" <> wrote:
>> >> In order to visit each input in the correct order, the
>> >>event-type-specific
>> >> streams need to be interleaved/time-aligned for each summarizer Task
>> >>based
>> >> on embedded timestamps. However, Samza offers no inter-stream ordering
>> >> semantics, so this ordering would seem to be the job of a custom
>> >> MessageChooser. But I don't see how a MessageChooser can do this
>> >> one additional bit of context: when we haven't been offered an item
>> >> one or more input streams, we need to know whether the missing
>> >> are "at head," to decide whether we must await an upcoming message for
>> >> timestamp comparison.
>> >
>> >Unless I misunderstand your design, I don't think knowing whether the
>> >stream is 'at head' actually helps you here.
>> >
>> >Let's say the upstream task is loading data into two topics:
>> >'level-changed' and 'purchase-completed'. Suppose a Kafka node goes
>> >down while it's producing a new batch of messages, and the
>> >'purchase-completed' events in that batch are written, but the
>> >'level-changed' events are not. The downstream Samza task will see /
>> >process those 'purchase-completed' events, but there's no way for it
>> >to know that it should expect some preceding 'level-changed' events --
>> >since they never made it into Kafka, Samza think's it's 'caught up'.
>> >(There are some other race-type problems you can get, but I think
>> >that's the most obvious.)
>> >
>> >Normally, I suggest that whenever you care about the relative ordering
>> >of some data, you try and put that data in the same partition of the
>> >same topic. When messages are in the same partition, the ordering's
>> >obvious -- but as you've noticed, it gets a lot trickier to
>> >re-establish order when Kafka doesn't enforce it. In your case, it
>> >seems like partitioning by shard id should work; consumers can just
>> >filter out the messages they don't care about.
>> >
>> >In your case, another option is to add periodic 'marker' messages with
>> >the current timestamp to each topic/partition. When your samza job
>> >gets a marker as input, it can be sure that the upstream job will
>> >never send an event with a smaller timestamp on that partition. When
>> >your task sees a 'purchase-completed' event, it just needs to buffer
>> >until it sees either a 'level-changed' event or a marker with a >=
>> >timestamp -- and then it can be confident it knows the player's level
>> >at that moment in time. (Still, I suggest the first option if you can
>> >swing it -- it's a whole lot harder to mess up.)
>> >
>> >--
>> >Ben Kirwin
>> >

Ben Kirwin

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message