flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Porting batch percentile computation to streaming window
Date Wed, 31 May 2017 07:50:35 GMT
This is what you are looking for:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#incremental-window-aggregation-with-foldfunction

Cheers,
Gyula

William Saar <william@saar.se> ezt írta (időpont: 2017. máj. 31., Sze,
1:36):

> Nice! The solution is actually starting to look quite clean with this in
> place.
>
> Finally, does Flink offer functionality to retrieve information about the
> current window that a rich function is running on? I don't see anything in
> the RuntimeContext classes about the current window...
>
> As you pointed out earlier, I need to attach a window ID (for instance,
> the starting timestamp of the window) to each metric and propagate it to
> the TDigest objects to be able to associate the metrics with the right
> TDigest in the last stateful CoFlatMapFunction. You mentioned I should
> compute the window information in the initial fold function that computes
> the metrics,  and while I can compute a common window-start timestamp from
> the events in the metrics computation it would seem less ugly and
> error-prone if I could get information about the current window the fold
> function is running on from Flink.
>
>
> ----- Original Message -----
> From:
> "Gyula Fóra" <gyula.fora@gmail.com>
>
> To:
> "William Saar" <william@saar.se>, <user@flink.apache.org>
> Cc:
>
> Sent:
>
> Tue, 30 May 2017 13:56:08 +0000
>
>
> Subject:
> Re: Porting batch percentile computation to streaming window
>
>
> I think you could actually do a window operation to get the tDigestStream
> from windowMetricsByIp:
>
> windowMetricsByIp.allWindow(SameWindowAsTumblingTimeWindow).fold(...)
>
> This way the watermark mechanism should ensure you get all partial results
> before flushing the global window.
>
> Gyula
>
> William Saar <william@saar.se> ezt írta (időpont: 2017. máj. 30., K,
> 15:03):
>
>> > This logic now assumes that you get the TDigest result before getting
>> any groupBy metric, which will probably not be the case so you could do
>> some custom buffering in state. Depending on the rate of the stream this
>> might or might not be feasible :)
>>
>> Unfortunately, I think this assumption is a deal-breaker. The value
>> stream is not grouped, but I need to distribute the values to compute the
>> metrics and I am populating the TDigest with the metrics
>>
>> Your suggestion gave me some ideas. Assume I have
>> windowMetricsByIp =
>> values.keyBy(ip).window(TumblingTimeWindow).fold(computeMetrics)
>> tDigestStream = windowMetricsByIp.global().flatMap(tDigestMapper) // How
>> do I know when the flat map has seen all values and should emit its result?
>> percentilesStream =
>> tDigestStream.broadcast().connect(windowMetricsByIp).flatMap
>>
>> If I attach information about the current window to the metrics events on
>> line 1, can I perhaps use that information to make flatMap on line 2 decide
>> when to emit its T-Digest? The crudest solution is to emit the T-Digest for
>> a window when the first event of the next window arrives (will this cause
>> problems with back-pressure?)
>> Less crude, maybe I can store watermark information or something on
>> metrics objects in line 1 and emit T digests more often in line 2?
>>
>> Finally, how do I access the watermark/window information in my fold
>> operation in line 1?
>>
>> Thanks!
>>
>>
>> ----- Original Message -----
>> From:
>> "Gyula Fóra" <gyula.fora@gmail.com>
>>
>> To:
>> "William Saar" <william@saar.se>, <user@flink.apache.org>
>> Cc:
>>
>> Sent:
>> Tue, 30 May 2017 08:56:28 +0000
>> Subject:
>> Re: Porting batch percentile computation to streaming window
>>
>>
>>
>>
>> Hi William,
>>
>> I think basically the feature you are looking for are side inputs which
>> is not implemented yet but let me try to give a workaround that might work.
>>
>> If I understand correctly you have two windowed computations:
>> TDigestStream = allMetrics.windowAll(...).reduce()
>> windowMetricsByIP = allMetrics.keyBy(ip).reduce()
>>
>> And now you want to join these two by window to compute the percentiles
>> Something like:
>>
>>
>> TDigestStream.broadcast().connect(windowMetricsByIP).flatMap(JoiningCoFlatMap)
>>
>> In your JoiningCoFlatMap you could keep a state of Map<Window, TDigest>
>> and every by ip metric aggregate could pick up the TDigest for the current
>> window. All this assumes that you attach the window information to the
>> aggregate metrics and the TDigest (you can do this in the window reduce
>> step).
>>
>> This logic now assumes that you get the TDigest result before getting any
>> groupBy metric, which will probably not be the case so you could do some
>> custom buffering in state. Depending on the rate of the stream this might
>> or might not be feasible :)
>>
>> Does this sound reasonable? I hope I have understood the use-case
>> correctly.
>> Gyula
>>
>>
>> William Saar <william@saar.se> ezt írta (időpont: 2017. máj. 29., H,
>> 18:34):
>>
>>> I am porting a calculation from Spark batches that uses broadcast
>>> variables to compute percentiles from metrics and curious for tips on doing
>>> this with Flink streaming.
>>>
>>> I have a windowed computation where I am compute metrics for
>>> IP-addresses (a windowed stream of metrics objects grouped by
>>> IP-addresses). Now I would like to compute percentiles for each IP from the
>>> metrics.
>>>
>>> My idea is to send all the metrics to a node that computes a global
>>> TDigest and then rejoins the computed global TDigest with the IP-grouped
>>> metrics stream to compute the percentiles for each IP. Is there a neat way
>>> to implement this in Flink?
>>>
>>> I am curious about the best way to join a global valuem like our
>>> TDigest, with every result of a grouped window stream.  Also how to know
>>> when the TDigest is complete and has seen every element in the window (say
>>> if I implement it in a stateful flatMap that emits the value after seeing
>>> all stream values).
>>>
>>> Thanks!
>>>
>>> William
>>>
>>

Mime
View raw message