samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From josh gruenberg <>
Subject Inter-stream time-alignment
Date Fri, 05 Dec 2014 21:08:39 GMT
Hey Samza folks,

I'm getting oriented with Samza. I'm very excited about the design, and
hopeful that the capabilities involving partitioning and state might be a
great fit for my team's needs. The functionality and documentation so far
have been mostly clear and accurate (nice work, thanks!), and I have a
simple flow up and running with a custom "serde" atop an in-house schema
repository for avro-formatted event data.

However, I'm struggling to satisfy a subtle requirement involving
timestamp-ordering across streams, and I could use some help. I found some
prior discussion related to this subject here:


... but I didn't find a strategy that clearly leads me to a working

Here's my use-case: my application (a game-server platform that I've built
for generates transactional event-logs that are spooled to
flat-files. Each file contains all log-entries produced within a specific
time-span for a shard of users (partitioned by hash of user-id). Within
each file, entries are in strict timestamp order, with all event-types
muxed together.

(For background: this is where we arrive after first logging event-data
into a sharded OLTP database, then siphoning it out in time-bucketed,
sharded batches to cold storage.)

Now, requirements:

   1. provide summarizers that augment some event-types with context
   provided by aggregating other types (I think you'd call this a stream-table
   join), for ingestion into a data warehouse and/or dashboarding
   2. support replaying history to correct bugs or add new summarizations

Seems like a fit for Samza, no?

A simple example: the analysis team wants to know a player's "level" in a
game at the time when they complete a purchase. These details -- player
level and purchase processing -- are disparate concerns in the application
tier, so it's preferable to just record "facts" in the application, and
compute context in the analytics back-end. To achieve this, a summarizer
could subscribe to PlayerLevelChanged events and PurchaseCompleted events,
and emit PlayerLevelAtPurchase records keyed by purchaseId (or eventId) for
later joining in the data warehouse. CQRS FTW!

The design:

My first notion is to begin a DAG of StreamTasks by collating/de-muxing
these event-streams into type-specific topics in kafka. Then each
summarization could subscribe to the set of topics it wants, retain local
Stores for summarization, and emit annotations/augmentations as appropriate.

The problem:

In order to visit each input in the correct order, the event-type-specific
streams need to be interleaved/time-aligned for each summarizer Task based
on embedded timestamps. However, Samza offers no inter-stream ordering
semantics, so this ordering would seem to be the job of a custom
MessageChooser. But I don't see how a MessageChooser can do this without
one additional bit of context: when we haven't been offered an item from
one or more input streams, we need to know whether the missing stream(s)
are "at head," to decide whether we must await an upcoming message for
timestamp comparison. I don't see this offset-status metadata being exposed
to the MessageChooser API (although the built-in BootstrappingChooser does
appear to get some access to it via the SystemStreamMetadata provided in
its constructor ..?).

Question: am I missing something? How can I choose messages in-order
without blocking in cases where some streams are "drained?"

(Let me know if that's not clear.)

As workarounds, alternative strategies that avoid the demuxed type-specific
topic streams might work, instead either 1. combining all of the
summarizers into a single StreamTask that implements its own internal DAG,
or 2. emitting Task-specific topics from the upstream source firehose that
duplicate all relevant events into filtered, time-ordered muxed sub-streams
with only the necessary data-types. But alternative #1 seems problematic if
we want to reprocess just a subtree of the DAG without reloading ALL input
data, and #2 results in distasteful tight-coupling and a lot of redundant
storage overhead, as well as requiring source-reload when new combinations
are introduced.

Thoughts? How would you approach this?

Thanks in advance!
-josh gruenberg (@joshng)

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message