flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #3191: [FLINK-5529] [FLINK-4752] [docs] Improve / extends...
Date Wed, 25 Jan 2017 17:32:13 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3191#discussion_r97832399
  
    --- Diff: docs/dev/windows.md ---
    @@ -622,133 +690,138 @@ input
     </div>
     </div>
     
    -## Dealing with Late Data
    +## Triggers
     
    -When working with event-time windowing it can happen that elements arrive late, i.e the
    -watermark that Flink uses to keep track of the progress of event-time is already past
the
    -end timestamp of a window to which an element belongs. Please
    -see [event time](./event_time.html) and especially
    -[late elements](./event_time.html#late-elements) for a more thorough discussion of
    -how Flink deals with event time.
    +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to
be
    +processed by the *window function*. Each `WindowAssigner` comes with a default `Trigger`.

    +If the default trigger does not fit your needs, you can specify a custom trigger using
`trigger(...)`.
     
    -You can specify how a windowed transformation should deal with late elements and how
much lateness
    -is allowed. The parameter for this is called *allowed lateness*. This specifies by how
much time
    -elements can be late. Elements that arrive within the allowed lateness are still put
into windows
    -and are considered when computing window results. If elements arrive after the allowed
lateness they
    -will be dropped. Flink will also make sure that any state held by the windowing operation
is garbage
    -collected once the watermark passes the end of a window plus the allowed lateness.
    +The trigger interface provides five methods that react to different events: 
     
    -<span class="label label-info">Default</span> By default, the allowed lateness
is set to
    -`0`. That is, elements that arrive behind the watermark will be dropped.
    +* The `onElement()` method is called for each element that is added to a window. 
    +* The `onEventTime()` method is called when  a registered event-time timer fires. 
    +* The `onProcessingTime()` method is called when a registered processing-time timer fires.

    +* The `onMerge()` method is relevant for stateful triggers and merges the states of two
triggers when their corresponding windows merge, *e.g.* when using session windows. 
    +* Finally the `clear()` method performs any action needed upon removal of the corresponding
window. 
     
    -You can specify an allowed lateness like this:
    +Any of these methods can be used to register processing- or event-time timers for future
actions. 
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +### Fire and Purge
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +Once a trigger determines that a window is ready for processing, it fires. This is the
signal for the window operator to emit the result of the current window. Given a window with
a `WindowFunction` 
    +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).

    +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated
result. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the
contents of the window, `FIRE_AND_PURGE` removes its content.
    +By default, the pre-implemented triggers simply `FIRE` without purging the window state.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
    +<span class="label label-danger">Attention</span> When purging, only the
contents of the window are cleared. The window itself is not removed and accepts new elements.
     
    -<span class="label label-info">Note</span> When using the `GlobalWindows`
window assigner no
    -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
    +### Default Triggers of WindowAssigners
     
    -## Triggers
    +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example,
all the event-time window assigners have an `EventTimeTrigger` as
    +default trigger. This trigger simply fires once the watermark passes the end of a window.

     
    -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for
being
    -processed by the *window function*. The trigger observes how elements are added to windows
    -and can also keep track of the progress of processing time and event time. Once a trigger
    -determines that a window is ready for processing, it fires. This is the signal for the
    -window operation to take the elements that are currently in the window and pass them
along to
    -the window function to produce output for the firing window.
    +<span class="label label-danger">Attention</span> The default trigger of
the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have
to define a custom trigger when using a `GlobalWindow`.
     
    -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should
be
    -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger`
as
    -default trigger. This trigger simply fires once the watermark passes the end of a window.
    +<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
    +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify
a
    +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based
on the
    +progress of time but only by count. Right now, you have to write your own custom trigger
if
    +you want to react based on both time and count.
     
    -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`.
The
    -whole specification of the windowed transformation would then look like this:
    +### Built-in and Custom Triggers
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +Flink comes with a few built-in triggers. 
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .trigger(<trigger>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time
as measured by watermarks. 
    +* The `ProcessingTimeTrigger` fires based on processing time. 
    +* The `CountTrigger` which fires once the number of elements in a window exceeds the
given limit.
    +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging
one. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +If you need to implement a custom trigger, you should check out the abstract {% gh_link
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger" %} class. Please note that the API is still evolving and might change in future
versions of Flink.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .trigger(<trigger>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
     
    -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger`
that
    -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger`
    -does the same but based on processing time and the `CountTrigger` fires once the number
of elements
    -in a window exceeds the given limit.
    +## Evictors
     
    -<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
    -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify
a
    -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based
on the
    -progress of time but only by count. Right now, you have to write your own custom trigger
if
    -you want to react based on both time and count.
    +Flinkā€™s windowing model allows specifying an optional `Evictor` in addition to the
`WindowAssigner` and the `Trigger`. 
    +This can be done using the `evictor(...)` method (shown in the beginning of this document).
The evictor has the ability 
    +to remove elements from a window *after* the trigger fires and *before and/or after*
the window function is applied.
    +To do so, the `Evictor` interface has two methods: 
    +
    +    /**
    +	 * Optionally evicts elements. Called before windowing function.
    +	 *
    +	 * @param elements The elements currently in the pane.
    +	 * @param size The current number of elements in the pane.
    +	 * @param window The {@link Window}
    +	 * @param evictorContext The context for the Evictor
    +     */
    +	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window,
EvictorContext evictorContext);
    +
    +	/**
    +	 * Optionally evicts elements. Called after windowing function.
    +	 *
    +	 * @param elements The elements currently in the pane.
    +	 * @param size The current number of elements in the pane.
    +	 * @param window The {@link Window}
    +	 * @param evictorContext The context for the Evictor
    +	 */
    +	void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window,
EvictorContext evictorContext);
    +
    +The `evictBefore()` contains the eviction logic to be applied before the window function,
while the `evictAfter()` 
    +contains the one to be applied after the window function. Elements evicted before the
application of the window 
    +function will not be processed by it.
    +
    +Flink comes with three pre-implemented evictors. These are:
    +
    +* `CountEvictor`: keeps up to a user-specified number of elements from the window and
discards the remaining ones from 
    +the beginning of the window buffer.
    +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between
the last element in the 
    +window buffer and each of the remaining ones, and removes the ones with a delta greater
or equal to the threshold.
    +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window,
it finds the maximum 
    +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller
than `max_ts - interval`.
     
    -The internal `Trigger` API is still considered experimental but you can check out the
code
    -if you want to write your own custom trigger:
    -{% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger.java" %}.
    +<span class="label label-info">Default</span> By default, all the pre-implemented
evictors apply their logic before the 
    +window function.
     
    -## Non-keyed Windowing
    +<span class="label label-danger">Attention</span> Specifying an evictor prevents
any pre-aggregation, as all the 
    +elements of a window have to be passed to the evictor before applying the computation.
     
    -You can also leave out the `keyBy()` when specifying a windowed transformation. This
means, however,
    -that Flink cannot process windows for different keys in parallel, essentially turning
the
    -transformation into a non-parallel operation.
    +<span class="label label-danger">Attention</span> Flink provides no guarantees
about the order of the elements within
    +a window. This implies that although an evictor may remove elements from the beginning
of the window, these are not 
    +necessarily the ones that arrive first or last.
     
    -<span class="label label-danger">Warning</span> As mentioned in the introduction,
non-keyed
    -windows have the disadvantage that work cannot be distributed in the cluster because
    -windows cannot be computed independently per key. This can have severe performance implications.
     
    +## Allowed Lateness
     
    -The basic structure of a non-keyed windowed transformation is as follows:
    +When working with *event-time* windowing it can happen that elements arrive late, *i.e.*
the watermark that Flink uses to 
    +keep track of the progress of event-time is already past the end timestamp of a window
to which an element belongs. See 
    +[event time](./event_time.html) and especially [late elements](./event_time.html#late-elements)
for a more thorough 
    +discussion of how Flink deals with event time.
    +
    +By default, late elements are dropped if their associated window was already evaluated.
However, 
    --- End diff --
    
    They are not dropped when the window was already evaluated but when the watermark is past
the end of the window plus the allowed lateness.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message