kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Bottazzini <bruno.bottazz...@targatelematics.com>
Subject Re: Use Kafka Streams for windowing data and processing each window at once
Date Tue, 24 Jul 2018 13:25:46 GMT
Hello Guozhang,

I understand. 

Thank you very much for the answer.

Best Regards,

Bruno

On lun, 2018-07-23 at 16:35 -0700, Guozhang Wang wrote:
> I see.
> 
> In that case, one workaround would be to query the state store
> directly
> after you know that no more updates would be applied to that store in
> a
> `punctuation` function: note that punctuation is a feature that's
> only
> available in the Processor API, but you can always add such a lower-
> level
> implementation into your DSL topology by calling `KStream#process() /
> transform()`.
> 
> Then what you can do, is to schedule a stream-time based punctuation
> function with the scheduling interval equal to the window size, and
> then
> whenever the punctuation function is triggered, it is triggered with
> the
> current stream time as its parameter, by using this stream time you
> can
> then determine which window(s) have become final: for example, if
> your
> window length is 10 and grace period is configured as 5, and your
> punctuation function triggers at 25, then you know that your window
> of [0,
> 10) should have been final already. You can then access that windowed
> state
> store directly using the fetchAll(fromTimestamp, toTimestamp) API to
> fetch
> all the keys of that range to generate the global report.
> 
> 
> Guozhang
> 
> 
> On Mon, Jul 23, 2018 at 12:48 AM, Bruno Bottazzini <
> bruno.bottazzini@targatelematics.com> wrote:
> 
> > 
> > Hello Guozhang,
> > 
> > Managing to have a stream with just one record per each key per
> > window
> > is definitely something we want to achieve.
> > 
> > But, it is not all. We also want to process the whole window at
> > once so
> > our callback would receive just one collection of aggregates per
> > window
> > once it is ready.
> > 
> > We would probably need to receive the collection as an iterable
> > that
> > dynamically loads the window in chunks as the size of the window
> > could
> > be unmanageable as single object.
> > 
> > This way we could produce one report for each window in the example
> > "Final window result per key" you manage to send an alert for each
> > user
> > with less than three events but we also want to collect in one
> > report
> > the list of all users with less than three events in the one hour
> > window.
> > 
> > Best Regards,
> > 
> > Bruno
> > 
> > On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> > > 
> > > Hello Bruno,
> > > 
> > > We've discussed about the callback approach before, but then we
> > > realized
> > > with the proposed API, this can still be achieved. In the "Final
> > > window
> > > results per key" section, John showed how to do that. Note the
> > > resulted
> > > stream will have exactly one record per each key, with the value
> > > representing the "final result" for that key.
> > > 
> > > 
> > > Guozhang
> > > 
> > > 
> > > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> > > bruno.bottazzini@targatelematics.com> wrote:
> > > 
> > > > 
> > > > 
> > > > Bill,
> > > > 
> > > > After reading the documentation and sure it looks really close
> > > > to
> > > > our
> > > > need however I had a doubt about it.
> > > > 
> > > > One small question.
> > > > 
> > > > I was expecting also a callback that Kafka would call after the
> > > > whole
> > > > period is passed and this callback would receive an iterable
> > > > object
> > > > that contains all the aggregated information that was collected
> > > > in
> > > > the
> > > > same period.
> > > > 
> > > > Will it be possible when using grace() or suppress()?
> > > > 
> > > > Best Regards,
> > > > 
> > > > Bruno
> > > > 
> > > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > > > 
> > > > > 
> > > > > Hi Bruno,
> > > > > 
> > > > > What you are asking is a common request.  There is a KIP in
> > > > > the
> > > > > works,
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+
> > > > > Abil
> > > > > ity+
> > > > > to+suppress+updates+for+KTables,
> > > > > that should suit the requirements you've outlined.
> > > > > 
> > > > > In the meantime, I'll see if I can come up with an
> > > > > alternative
> > > > > approach
> > > > > over the next few days.
> > > > > 
> > > > > -Bill
> > > > > 
> > > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > > > bruno.bottazzini@targatelematics.com> wrote:
> > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > Hello,
> > > > > > 
> > > > > > We have a doubt about Kafka streams on how it works. Or at
> > > > > > least we
> > > > > > are
> > > > > > having some troubles in making it to work.
> > > > > > 
> > > > > > The purpose we want to achieve is to group by user some
> > > > > > messages
> > > > > > that
> > > > > > we receive from a Kafka topic and window them in order to
> > > > > > aggregate
> > > > > > the
> > > > > > messages we receive in the window (5 minutes). Then, I'd
> > > > > > like
> > > > > > to
> > > > > > collect all aggregates in each window in order to process
> > > > > > them
> > > > > > at
> > > > > > once
> > > > > > adding them to a report of all the messages I received in
> > > > > > the 5
> > > > > > minutes
> > > > > > interval.
> > > > > > 
> > > > > > The last point seems to be the tough part as Kafka Streams
> > > > > > doesn't
> > > > > > seem
> > > > > > to provide (at least we can't find it :() anything that can
> > > > > > collect
> > > > > > all
> > > > > > the window related stuff in a "finite" stream to be
> > > > > > processed
> > > > > > in
> > > > > > one
> > > > > > place.
> > > > > > 
> > > > > > The file (implemented_code.txt) contains the code we have
> > > > > > implemented
> > > > > > where it contains at least one of our tries to make it to
> > > > > > work.
> > > > > > 
> > > > > > You can find its result inside the file (result.txt)
> > > > > > 
> > > > > > For each window there are many log lines and they are mixed
> > > > > > with
> > > > > > the
> > > > > > other windows.
> > > > > > 
> > > > > > What I'd like to have is something like:
> > > > > > 
> > > > > > // Hypothetical implementation
> > > > > > windowedMessages.streamWindows((interval, window) ->
> > > > > > process(interval,
> > > > > > window));
> > > > > > 
> > > > > > where method process would be something like:
> > > > > > 
> > > > > > // Hypothetical implementation
> > > > > > void process(Interval interval, WindowStream<UserId,
> > > > > > List<Message>>
> > > > > > windowStream) {
> > > > > > // Create report for the whole window
> > > > > > Report report = new Report(nameFromInterval());
> > > > > >     // Loop on the finite iterable that represents the
> > > > > > window
> > > > > > content
> > > > > >     for (WindowStreamEntry<UserId, List<Message>>
entry:
> > > > > > windowStream)
> > > > > > {
> > > > > >         report.addLine(entry.getKey(), entry.getValue());
> > > > > >     }
> > > > > >     report.close();
> > > > > > }
> > > > > > 
> > > > > > 
> > > > 
> > > 
> > > 
> > > 
> > 
> 
> 
> 

Mime
View raw message