kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ara Ebrahimi <ara.ebrah...@argyledata.com>
Subject Re: micro-batching in kafka streams
Date Wed, 28 Sep 2016 21:19:58 GMT
I need this ReadOnlyKeyValueStore.

In my use case, I do an aggregateByKey(), so a KTable is formed, backed by a state store.
This is then used by the next steps of the pipeline. Now using the word count sample, I try
to read the state store. Hence I end up sharing it with the actual pipeline. And Kafka Streams
doesn’t like that because apparently state stores are assumed to be writable (indeed there’s
a put() in there) and deep in the rocksdb state store code it tries to create a lock file
(and indeed there is a filed named LOCK in there).

Ara.

On Sep 28, 2016, at 2:03 PM, Guozhang Wang <wangguoz@gmail.com<mailto:wangguoz@gmail.com>>
wrote:

Ara,

Are you using the interactive queries feature but encountered issue due to
locking file conflicts?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

This is not expected to happen, if you are indeed using this feature I'd
like to learn more of your error scenario.

Guozhang


On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrahimi@argyledata.com<mailto:ara.ebrahimi@argyledata.com>>
wrote:

One more thing:

Guozhang pointed me towards this sample for micro-batching:
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
07c1aef5a1/streams/examples/src/main/java/org/apache/
kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case.
BUT the main problem is that even if my use case deals with writing of
hourly windows of data and hence the data is already in a rocksdb file but
I need to create a duplicate of the same file just to be able to
periodically do range scans on it and write to the external database. I did
try to see if I could get StateStore to read the same rocksdb file used by
the aggregateByKey which is happening before this step but it complained
about not being able to lock the file. Would be great to be able to share
the same underlying file between aggregateByKey (or any other such
KTable-producing operation) and such periodic triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrahimi@argyledata.com<mailto:ara.ebrahimi@argyledata.com><
mailto:ara.ebrahimi@argyledata.com>> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply
hack it. This specific case is writing of records to Accmumlo database, and
I simply use the batch writer to batch writes, and it flushes every second
or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit
too. This is good enough for me, but obviously it’s not perfect. I wish
Kafka Streams had some sort of a trigger (based on x number of records
processed, or y window of time passed). Which brings me to the next use
case.

- I have some logic for calculating hourly statistics. So I’m dealing with
Windowed data already. These stats then need to be written to an external
database for use by user facing systems. Obviously I need to write the
final result for each hourly window after we’re past that window of time
(or I can write as often as it gets updated but the problem is that the
external database is not as fast as Kafka). I do understand that I need to
take into account the fact that events may arrive out of order and there
may be some records arriving a little bit after I’ve considered the
previous window over and have moved to the next one. I’d like to have some
sort of an hourly trigger (not just pure x milliseconds trigger, but also
support for cron style timing) and then also have the option to update the
stats I’ve already written for a window a set amount of time after the
trigger got triggered so that I can deal with events which arrive after the
write for that window. And then there’s a cut-off point after which
updating the stats for a very old window is just not worth it. Something
like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update
every hour afterwards */ Hours.toMillis(1), /* discard changes older than
this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey,
record) -> { /* write */ } );

The tricky part is reconciling event source time and event processing
time. Clearly this trigger is in the event processing time whereas the data
is in the event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll <michael@confluent.io<mailto:michael@confluent.io><mailto:m
ichael@confluent.io<mailto:ichael@confluent.io>>> wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com<mailto:ara.ebrahimi@argyledata.com><
mailto:ara.ebrahimi@argyledata.com>>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.



________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________




________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________





________________________________

This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is prohibited. Thank you in
advance for your cooperation.

________________________________




--
-- Guozhang



________________________________

This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.

________________________________




________________________________

This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.

________________________________
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message