kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: Use Kafka Streams for windowing data and processing each window at once
Date Thu, 19 Jul 2018 16:59:02 GMT
Hi Bruno,

What you are asking is a common request.  There is a KIP in the works,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables,
that should suit the requirements you've outlined.

In the meantime, I'll see if I can come up with an alternative approach
over the next few days.

-Bill

On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
bruno.bottazzini@targatelematics.com> wrote:

> Hello,
>
> We have a doubt about Kafka streams on how it works. Or at least we are
> having some troubles in making it to work.
>
> The purpose we want to achieve is to group by user some messages that
> we receive from a Kafka topic and window them in order to aggregate the
> messages we receive in the window (5 minutes). Then, I'd like to
> collect all aggregates in each window in order to process them at once
> adding them to a report of all the messages I received in the 5 minutes
> interval.
>
> The last point seems to be the tough part as Kafka Streams doesn't seem
> to provide (at least we can't find it :() anything that can collect all
> the window related stuff in a "finite" stream to be processed in one
> place.
>
> The file (implemented_code.txt) contains the code we have implemented
> where it contains at least one of our tries to make it to work.
>
> You can find its result inside the file (result.txt)
>
> For each window there are many log lines and they are mixed with the
> other windows.
>
> What I'd like to have is something like:
>
> // Hypothetical implementation
> windowedMessages.streamWindows((interval, window) -> process(interval,
> window));
>
> where method process would be something like:
>
> // Hypothetical implementation
> void process(Interval interval, WindowStream<UserId, List<Message>>
> windowStream) {
> // Create report for the whole window
> Report report = new Report(nameFromInterval());
>     // Loop on the finite iterable that represents the window content
>     for (WindowStreamEntry<UserId, List<Message>> entry: windowStream)
> {
>         report.addLine(entry.getKey(), entry.getValue());
>     }
>     report.close();
> }
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message