samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: buffering records for join
Date Thu, 12 Oct 2017 21:49:50 GMT
Oh, just want to follow up on the question on AsyncStreamTask. The main
consideration factor on setting a big task.max.concurrency value is the
memory required.

And assuming your join function works like micro matches (i.e. buffer all
input messages from all streams for a while, apply the join function, and
discard all buffered messages), you can make it work w/o a local KV store.
If any of your buffered message is not for the current join, but is needed
for future join, you may have to use a local KV store to ensure correctness
and no-dataloss across container restarts.

-Yi

On Thu, Oct 12, 2017 at 11:40 AM, Yi Pan <nickpan47@gmail.com> wrote:

> Hi, Jef,
>
> What I suggest is exactly in-process, in-memory KV store. Samza has two
> types of such built-in KVstores: in-memory and RocksDB. Both can be backed
> by a changelog topic in Kafka as the failure recovery mechanism (i.e. if a
> container fails, it can reseed the whole store from the changelog topic in
> Kafka). Since they are built-in KV stores in Samza, there is no additional
> external systems introduced in the solution. Here is the doc link for state
> management in Samza: http://samza.apache.org/learn/documentation/0.13/
> container/state-management.html. Please read the section of "Local State".
>
> The Samza high-level API is not based on KStream API. It is supporting
> bi-way join now, but as long as the join key are in the same partition the
> result can be in memory and does not need to go through another persistent
> topics.
>
> Best!
>
> -Yi
>
> On Thu, Oct 12, 2017 at 10:00 AM, Jef G <jefg@dataminr.com> wrote:
>
>> Yi, thanks for your detailed reply!
>>
>> I believe what you are suggesting regarding a KV store is to set up a
>> remote durable system to maintain the join state. That way if a node dies
>> and Samza restarts the task on another node, the join state is still
>> available. Is that correct?
>>
>> This approach is certainly an option. However, we were hoping to use an
>> in-process in-memory KV store, as a remote store would introduce a lot of
>> latency for us. In some cases we would have to make more than 100,000
>> round
>> trips per second to the KV store for a single stream, and we would want to
>> be able to scale beyond that. It also introduces some complexity and
>> another point of failure.
>>
>> Regarding using AsyncStreamTask with a very large (100,000)
>> task.max.concurrency, is that a bad idea?
>>
>> The high-level API is based on the KStream API, right? Our jobs will
>> sometimes need to join as many as 20 input streams. I believe currently
>> Samza (and KStream) only supports a binary join and if that is the case,
>> we
>> would need 19 binary joins. The KStream doc suggests that all intermediate
>> results are persisted, so many chained joins might be very inefficient. If
>> so we would prefer to use the "classic" API.
>>
>> -Jef
>>
>> On Wed, Oct 11, 2017 at 7:37 PM, Yi Pan <nickpan47@gmail.com> wrote:
>>
>> > Hi, Jef,
>> >
>> > I would recommend that you use a KV store to buffer the messages for
>> join.
>> > The logic would be more predictable and state is also durable. In
>> > StreamTask.process(), you can do some pseudo code like below:
>> > {code}
>> > public void process(IncomingMessageEnvelope msg, MessageCollector
>> > collector, TaskCoordinator coordinator) {
>> >    if (msg is from streamA) {
>> >       storeA.put(msg.key, msg);
>> >    } else {
>> >       storeB.put(msg.key, msg);
>> >    }
>> >    if (joinCondition is triggered) {
>> >       doJoin(storeA, storeB);
>> >    }
>> > }
>> > {code}
>> >
>> > Make sure that you configure storeA and storeB w/ changelog s.t. they
>> can
>> > be recovered. Then, you don't need to worry about the data loss, since
>> > before the auto-checkpoint, your buffered messages are flushed to disk
>> and
>> > changelog via storeA and storeB. If you do not want to delete each and
>> > every buffered message after join, you can set TTL for each store if you
>> > are using RocksDB store.
>> >
>> > We are also actively working on build-in join operator in Samza
>> high-level
>> > APIs. The new high-level APIs are already released in Samza 0.13.1 with
>> the
>> > feature preview here:
>> > http://samza.apache.org/startup/preview/#high-level-api. Feel free to
>> take
>> > a look and try it. We love to hear about feedbacks now. The current
>> version
>> > does not support durable state in join yet. We are actively working on
>> > durable state support in he next release. Note that the high-level API
>> is
>> > still in early evolution and might change in the next two releases.
>> >
>> > Best!
>> >
>> > -Yi
>> >
>> > On Wed, Oct 11, 2017 at 1:56 PM, Jef G <jefg@dataminr.com> wrote:
>> >
>> > > Hello. My team is looking into Samza for doing real-time processing.
>> We
>> > > would like to run a directed graph of jobs, where the records in each
>> > job's
>> > > input streams are joined on a common key. We have functionality to
>> > perform
>> > > the join by buffering records from the input streams until certain
>> > > conditions are met and then passing them on.
>> > >
>> > > We are wondering about the best way to integrate this functionality
>> into
>> > a
>> > > Samza job. After looking over the API we see two possibilities:
>> > >
>> > > 1. Use a StreamTask that adds records to a buffer. This is the method
>> > that
>> > > the "ad event" example uses. But we am concerned that the framework
>> > commits
>> > > a StreamTask's offset after process() completes, so if the job fails,
>> > > records in the buffer are permanently lost.
>> > >
>> > > 2. Use an AsyncTask that adds records to a buffer. Also add
>> TaskCallbacks
>> > > to the buffer. When records are eventually joined and processed,
>> commit
>> > > their callbacks. This method seems promising but it requires setting
>> > > task.max.concurrency very high - possibly in the tens of thousands in
>> our
>> > > case. Are we likely to run into any issues doing that?
>> > >
>> > > Are there any other options that we overlooked? What is the best
>> > approach?
>> > >
>> > > -Jef G
>> > >
>> >
>>
>>
>>
>> --
>> Jef G
>> Senior Data Scientist | Dataminr | dataminr.com
>> 99 Madison Ave, 3rd Floor | New York, NY 10016
>> jefg@dataminr.com
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message