flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Associative operation + windowAll - possible parallelism
Date Wed, 12 Jul 2017 09:41:27 GMT
Yes, your observations are correct! 

Currently, I see two possible solutions that you could implement as a user:

1. Use .window() with a dummy key followed by a .windowAll():

DataStream<T> input = …;
input
  .map( (in) -> new Tuple2<Integer, T>(<rand int>, in)) 
  .keyBy(0)
  .window(…)
  .aggregate(...)
  .windowAll(…)
  .aggregate(…)

The problem here is that you incur a shuffle, which may or may not improve performance, depending
on the aggregation operation.

To get around that shuffle you would have to use option 2.

2. Use a custom StreamOperator that does the pre-aggregation and emits results on event-time,
followed by .windowAll();

DataStream<T> input = …;
input
  .transform(<name>, <type info>, new PreAggregationOperator()) 
  .windowAll(…)
  .aggregate(…)

Where PreAggregationOperator would pre-aggregate and checkpoint the pre-aggregated values
and emit the pre-aggregate when the watermark for the end of a window arrives. The reason
for why you have to use a custom operator is that a user function cannot “listen” on the
watermark and therefore would not be able to emit the aggregate at the right time.

I hope this helps.

Best,
Aljoscha

> On 11. Jul 2017, at 22:05, Debski <a.debski@avsystem.com> wrote:
> 
> Let us assume that I want to perform some kind of aggregation in specified
> time windows (e.g. tumbling window of 1 minute) and my aggregation operation
> is associative. Wouldn't it be possible to represent windowAll in runtime as
> /parallelism + 1/ operator instances where /parallelism/ number of operators
> compute partial aggregates and then partial results are merged into one in
> the last instance of the operator by using merge function that is present in
> AggregateFunction function.
> 
> Basically I would like to compute single aggregated value for all events in
> given time window and aggregation operation itself can be parallelized. 
> 
> For example i could have mapped stream with .map operation that has
> parallelism 4, then each map operator instance would pass 1/4 of events to
> adjacent instance of windowAll operator that would compute desired aggregate
> over subset of events. When the window is closed all partial states would be
> transferred to single windowAll merging operator.
> 
> Are there any plans to support such situations/is it possible to somehow
> implement such operator in current version of Flink. 
> 
> Also there is a note in windowAll java-doc about possible parallelism but I
> don't know how relevant it is to my case:
> 
> Note: This operation can be inherently non-parallel since all elements have
> to pass through the same operator instance. (Only for special cases, such as
> aligned time windows is it possible to perform this operation in parallel).
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Associative-operation-windowAll-possible-parallelism-tp14187.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message