kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: micro-batching in kafka streams
Date Wed, 28 Sep 2016 22:20:29 GMT
Ara,

I'd recommend you using the interactive queries feature, available in the
up coming 0.10.1 in a couple of weeks, to query the current snapshot of the
state store.

We are going to write a blog post about step-by-step instructions to
leverage this feature for use cases just like yours soon.

Guozhang


On Wed, Sep 28, 2016 at 2:19 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com>
wrote:

> I need this ReadOnlyKeyValueStore.
>
> In my use case, I do an aggregateByKey(), so a KTable is formed, backed by
> a state store. This is then used by the next steps of the pipeline. Now
> using the word count sample, I try to read the state store. Hence I end up
> sharing it with the actual pipeline. And Kafka Streams doesn’t like that
> because apparently state stores are assumed to be writable (indeed there’s
> a put() in there) and deep in the rocksdb state store code it tries to
> create a lock file (and indeed there is a filed named LOCK in there).
>
> Ara.
>
> On Sep 28, 2016, at 2:03 PM, Guozhang Wang <wangguoz@gmail.com<mailto:wan
> gguoz@gmail.com>> wrote:
>
> Ara,
>
> Are you using the interactive queries feature but encountered issue due to
> locking file conflicts?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 67%3A+Queryable+state+for+Kafka+Streams
>
> This is not expected to happen, if you are indeed using this feature I'd
> like to learn more of your error scenario.
>
> Guozhang
>
>
> On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrahimi@argyledata.com
> <mailto:ara.ebrahimi@argyledata.com>>
> wrote:
>
> One more thing:
>
> Guozhang pointed me towards this sample for micro-batching:
> https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
> 07c1aef5a1/streams/examples/src/main/java/org/apache/
> kafka/streams/examples/wordcount/WordCountProcessorDemo.java
>
> This is a good example and successfully got it adapted for my user case.
> BUT the main problem is that even if my use case deals with writing of
> hourly windows of data and hence the data is already in a rocksdb file but
> I need to create a duplicate of the same file just to be able to
> periodically do range scans on it and write to the external database. I did
> try to see if I could get StateStore to read the same rocksdb file used by
> the aggregateByKey which is happening before this step but it complained
> about not being able to lock the file. Would be great to be able to share
> the same underlying file between aggregateByKey (or any other such
> KTable-producing operation) and such periodic triggers.
>
> Ara.
>
> On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrahimi@argyledata.com<
> mailto:ara.ebrahimi@argyledata.com><
> mailto:ara.ebrahimi@argyledata.com>> wrote:
>
> Hi,
>
> So, here’s the situation:
>
> - for classic batching of writes to external systems, right now I simply
> hack it. This specific case is writing of records to Accmumlo database, and
> I simply use the batch writer to batch writes, and it flushes every second
> or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit
> too. This is good enough for me, but obviously it’s not perfect. I wish
> Kafka Streams had some sort of a trigger (based on x number of records
> processed, or y window of time passed). Which brings me to the next use
> case.
>
> - I have some logic for calculating hourly statistics. So I’m dealing with
> Windowed data already. These stats then need to be written to an external
> database for use by user facing systems. Obviously I need to write the
> final result for each hourly window after we’re past that window of time
> (or I can write as often as it gets updated but the problem is that the
> external database is not as fast as Kafka). I do understand that I need to
> take into account the fact that events may arrive out of order and there
> may be some records arriving a little bit after I’ve considered the
> previous window over and have moved to the next one. I’d like to have some
> sort of an hourly trigger (not just pure x milliseconds trigger, but also
> support for cron style timing) and then also have the option to update the
> stats I’ve already written for a window a set amount of time after the
> trigger got triggered so that I can deal with events which arrive after the
> write for that window. And then there’s a cut-off point after which
> updating the stats for a very old window is just not worth it. Something
> like this DSL:
>
> kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update
> every hour afterwards */ Hours.toMillis(1), /* discard changes older than
> this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey,
> record) -> { /* write */ } );
>
> The tricky part is reconciling event source time and event processing
> time. Clearly this trigger is in the event processing time whereas the data
> is in the event source time most probably.
>
> Something like that :)
>
> Ara.
>
> On Sep 26, 2016, at 1:59 AM, Michael Noll <michael@confluent.io<mailto:m
> ichael@confluent.io><mailto:m
> ichael@confluent.io<mailto:ichael@confluent.io>>> wrote:
>
> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com<
> mailto:ara.ebrahimi@argyledata.com><
> mailto:ara.ebrahimi@argyledata.com>>
> wrote:
>
> Hi,
>
> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
> built-in mechanism? Perhaps StateStore could act as the buffer? What
> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
> to be used anywhere?
>
> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>
> Ara.
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>
>
>
>
> --
> -- Guozhang
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message