samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <>
Subject Re: Inter-stream time-alignment
Date Fri, 05 Dec 2014 22:52:12 GMT
Hey Josh,

Your summary is very well described. First, a few questions:

> my application ... generates transactional event-logs that are spooled
>to flat-files. Each file contains all log-entries produced within a
>specific time-span for a shard of users (partitioned by hash of user-id).
>Within each file, entries are in strict timestamp order, with all
>event-types muxed together.

How are you thinking about loading these flat-files into Kafka? The reason
I ask is because I want to discern whether, for a single Kafka topic, you
have multiple writers, or just a single writer. This distinction is
important because, even if your flat-files are strictly ordered, if you
have multiple flat-files on different machines getting interleaved into a
single Kafka topic, the strict ordering is lost. To guarantee strict
ordering in this case, you basically need 1 flat-file per-topic being sent
to one Kafka topic at a time, all from one machine (single clock).

I'll move forward assuming that each DB/table exists on just one machine,
and its dumps are being sent to a unique Kafka topic (per table) by just a
single writer. Thus, you would have exactly strict ordering in the Kafka

Related: are these Kafka topic partitioned, or single partition?

> My first notion is to begin a DAG of StreamTasks by collating/de-muxing
>these event-streams into type-specific topics in kafka

This would is also my instinct.

> Question: am I missing something? How can I choose messages in-order
>without blocking in cases where some streams are "drained?"

I don't believe that you can right now. What you need is a field in
IncomingMessageEnvelope that indicates whether the message is the "last"
one in the SSP. If it is, then you know it's safe to proceed by picking a
message from another SSP, and if not, you'll need to block. This seems
like a pretty reasonable use case to me. Can you open a JIRA for this?

In the mean time, I agree that your alternatives look a bit grim. One
question that I have is whether your topics are high-volume enough to make
blocking feasible for the time being? If you are doing a join between
player level, and purchase complete, and both of these streams are
high-volume enough, then you can simply block until the messages arrive,
and then compare timestamps on the messages to figure out whether it's
safe to pick a message.

If your incoming streams are not high-volume enough to allow blocking, one
thing that you could do is inject heartbeat messages into the topics, so
that the chooser always knows what the current stream time is. That way,
even if no messages are available, you'll get a heartbeat message that
will tell you the current time in the stream. This is somewhat hacky, and
your StreamTask will have to disregard the heartbeat messages that are
chosen by the MessageChooser, but I think it should work.

In short, can you open a JIRA for this? If you can't wait for the next
release, you can either custom roll Samza 0.8 with the patch in it, or
move forward with the heartbeat alternative, one of the ideas you've had
below, or perhaps someone else has another thought.

Sorry it's not better news. We'll try and fix this.


On 12/5/14 1:08 PM, "josh gruenberg" <> wrote:

>Hey Samza folks,
>I'm getting oriented with Samza. I'm very excited about the design, and
>hopeful that the capabilities involving partitioning and state might be a
>great fit for my team's needs. The functionality and documentation so far
>have been mostly clear and accurate (nice work, thanks!), and I have a
>simple flow up and running with a custom "serde" atop an in-house schema
>repository for avro-formatted event data.
>However, I'm struggling to satisfy a subtle requirement involving
>timestamp-ordering across streams, and I could use some help. I found some
>prior discussion related to this subject here:
>   -
>   -
>... but I didn't find a strategy that clearly leads me to a working
>Here's my use-case: my application (a game-server platform that I've built
>for generates transactional event-logs that are spooled to
>flat-files. Each file contains all log-entries produced within a specific
>time-span for a shard of users (partitioned by hash of user-id). Within
>each file, entries are in strict timestamp order, with all event-types
>muxed together.
>(For background: this is where we arrive after first logging event-data
>into a sharded OLTP database, then siphoning it out in time-bucketed,
>sharded batches to cold storage.)
>Now, requirements:
>   1. provide summarizers that augment some event-types with context
>   provided by aggregating other types (I think you'd call this a
>   join), for ingestion into a data warehouse and/or dashboarding
>   2. support replaying history to correct bugs or add new summarizations
>   post-hoc
>Seems like a fit for Samza, no?
>A simple example: the analysis team wants to know a player's "level" in a
>game at the time when they complete a purchase. These details -- player
>level and purchase processing -- are disparate concerns in the application
>tier, so it's preferable to just record "facts" in the application, and
>compute context in the analytics back-end. To achieve this, a summarizer
>could subscribe to PlayerLevelChanged events and PurchaseCompleted events,
>and emit PlayerLevelAtPurchase records keyed by purchaseId (or eventId)
>later joining in the data warehouse. CQRS FTW!
>The design:
>My first notion is to begin a DAG of StreamTasks by collating/de-muxing
>these event-streams into type-specific topics in kafka. Then each
>summarization could subscribe to the set of topics it wants, retain local
>Stores for summarization, and emit annotations/augmentations as
>The problem:
>In order to visit each input in the correct order, the event-type-specific
>streams need to be interleaved/time-aligned for each summarizer Task based
>on embedded timestamps. However, Samza offers no inter-stream ordering
>semantics, so this ordering would seem to be the job of a custom
>MessageChooser. But I don't see how a MessageChooser can do this without
>one additional bit of context: when we haven't been offered an item from
>one or more input streams, we need to know whether the missing stream(s)
>are "at head," to decide whether we must await an upcoming message for
>timestamp comparison. I don't see this offset-status metadata being
>to the MessageChooser API (although the built-in BootstrappingChooser does
>appear to get some access to it via the SystemStreamMetadata provided in
>its constructor ..?).
>Question: am I missing something? How can I choose messages in-order
>without blocking in cases where some streams are "drained?"
>(Let me know if that's not clear.)
>As workarounds, alternative strategies that avoid the demuxed
>topic streams might work, instead either 1. combining all of the
>summarizers into a single StreamTask that implements its own internal DAG,
>or 2. emitting Task-specific topics from the upstream source firehose that
>duplicate all relevant events into filtered, time-ordered muxed
>with only the necessary data-types. But alternative #1 seems problematic
>we want to reprocess just a subtree of the DAG without reloading ALL input
>data, and #2 results in distasteful tight-coupling and a lot of redundant
>storage overhead, as well as requiring source-reload when new combinations
>are introduced.
>Thoughts? How would you approach this?
>Thanks in advance!
>-josh gruenberg (@joshng)

View raw message