samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <>
Subject Re: buffering records for join
Date Wed, 11 Oct 2017 23:37:43 GMT
Hi, Jef,

I would recommend that you use a KV store to buffer the messages for join.
The logic would be more predictable and state is also durable. In
StreamTask.process(), you can do some pseudo code like below:
public void process(IncomingMessageEnvelope msg, MessageCollector
collector, TaskCoordinator coordinator) {
   if (msg is from streamA) {
      storeA.put(msg.key, msg);
   } else {
      storeB.put(msg.key, msg);
   if (joinCondition is triggered) {
      doJoin(storeA, storeB);

Make sure that you configure storeA and storeB w/ changelog s.t. they can
be recovered. Then, you don't need to worry about the data loss, since
before the auto-checkpoint, your buffered messages are flushed to disk and
changelog via storeA and storeB. If you do not want to delete each and
every buffered message after join, you can set TTL for each store if you
are using RocksDB store.

We are also actively working on build-in join operator in Samza high-level
APIs. The new high-level APIs are already released in Samza 0.13.1 with the
feature preview here: Feel free to take
a look and try it. We love to hear about feedbacks now. The current version
does not support durable state in join yet. We are actively working on
durable state support in he next release. Note that the high-level API is
still in early evolution and might change in the next two releases.



On Wed, Oct 11, 2017 at 1:56 PM, Jef G <> wrote:

> Hello. My team is looking into Samza for doing real-time processing. We
> would like to run a directed graph of jobs, where the records in each job's
> input streams are joined on a common key. We have functionality to perform
> the join by buffering records from the input streams until certain
> conditions are met and then passing them on.
> We are wondering about the best way to integrate this functionality into a
> Samza job. After looking over the API we see two possibilities:
> 1. Use a StreamTask that adds records to a buffer. This is the method that
> the "ad event" example uses. But we am concerned that the framework commits
> a StreamTask's offset after process() completes, so if the job fails,
> records in the buffer are permanently lost.
> 2. Use an AsyncTask that adds records to a buffer. Also add TaskCallbacks
> to the buffer. When records are eventually joined and processed, commit
> their callbacks. This method seems promising but it requires setting
> task.max.concurrency very high - possibly in the tens of thousands in our
> case. Are we likely to run into any issues doing that?
> Are there any other options that we overlooked? What is the best approach?
> -Jef G

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message