flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4062) Update Windowing Documentation
Date Fri, 01 Jul 2016 08:18:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358630#comment-15358630 ] 

ASF GitHub Bot commented on FLINK-4062:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2154#discussion_r69264695
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -24,1023 +24,608 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    +Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream` into finite
    +slices based on the timestamps of elements or other criteria. This division is required when working
    +with infinite streams of data and performing transformations that aggregate elements.
    +
    +<span class="label label-info">Info</span> We will mostly talk about *keyed windowing* here, i.e.
    +windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements are
    +subdivided based on both window and key before being given to
    +a user function. The work can thus be distributed across the cluster
    +because the elements for different keys can be processed independently. If you absolutely have to,
    +you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed
    +windows work.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -## Windows on Keyed Data Streams
    -
    -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*,
    -i.e., each window will contain elements with the same key value.
    +## Basics
     
    -### Basic Window Constructs
    +For a windowed transformations you must at least specify a *key*
    +(see [specifying keys](apis/common/index.html#specifying-keys))
    +a *window assigner* and a *window function*. The *key* divides the infinite, non-keyed, stream
    +into logical keyed streams while the *window assigner* assigns elements to finite per-key windows.
    +Finally, the *window function* is used to process the elements of each window.
     
    -Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows
    -for common use cases. See first if your use case can be served by the pre-defined windows below before moving
    -to defining your own windows.
    +The basic structure of a windowed transformation is thus as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<T> input = ...;
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window.
    -	  The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5));
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are
    -             grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight java %}
    -keyedStream.countWindow(1000);
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by at most 900 elements).
    -  {% highlight java %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[T] = ...
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window.
    -          The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that elements are
    -             grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight scala %}
    -keyedStream.countWindow(1000)
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are
    -          grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by at most 900 elements).
    -  {% highlight scala %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>)
    +{% endhighlight %}
     </div>
     </div>
     
    -### Advanced Window Constructs
    +We will cover [window assigners](#window-assigners) in a separate section below.
    +
    +The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively
    +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways
    +of specifying a windowed transformation in detail below: [window functions](#window-functions).
    +
    +For more advanced use cases you can also specify a `Trigger` that determines when exactly a window
    +is being considered as *ready for processing*. These will be covered in more detail in
    +[triggers](#triggers).
    +
    +## Window Assigners
    +
    +The window assigner specifies how elements of the stream are divided into finite slices. Flink comes
    +with pre-implemented window assigners for the most typical use cases, namely *tumbling windows*,
    +*sliding windows*, *session windows* and *global windows* but you can implement your own by
    +extending the `WindowAssigner` class. All the built-in window assigners, except for the global
    +windows one, assign elements to windows based on time, which can either be processing time or event
    +time.
    +
    +Let's first look at how each of these window assigners works before looking at how they can be used
    +in a Flink program. We will be using abstract figures to visualize the workings of each assigner:
    +in the following, the purple circles are elements of the stream, they are partitioned
    +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress
    +of time.
    +
    +### Global Windows
    +
    +Global windows are a way of specifying that we don't want to subdivide our elements into windows.
    +Each element is assigned to one single per-key *global window*.
    +This windowing scheme is only useful if you also specify a custom [trigger](#triggers). Otherwise,
    +no computation is ever going to be performed, as the global window does not have a natural end at
    +which we could process the aggregated elements.
    +
    +<img src="non-windowed.svg" class="center" style="width: 80%;" />
    +
    +### Tumbling Windows
    +
    +A *tumbling windows* assigner assigns elements to fixed length, non-overlapping windows of a
    +specified *window size*.. For example, if you specify a window size of 5 minutes, the window
    +function will get 5 minutes worth of elements in each invocation.
     
    -The general mechanism can define more powerful windows at the cost of more verbose syntax. For example,
    -below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second,
    -but the execution of the window function is triggered when 100 elements have been added to the
    -window, and every time execution is triggered, 10 elements are retained in the window:
    +<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
    +
    +### Sliding Windows
    +
    +The *sliding windows* assigner assigns elements to windows of fixed length equal to *window size*,
    +as the tumbling windows assigner, but in this case, windows can be overlapping. The size of the
    +overlap is defined by the user-specified parameter *window slide*. As windows are overlapping, an
    +element can be assigned to multiple windows
    +
    +For example, you could have windows of size 10 minutes that slide by 5 minutes. With this you get 10
    +minutes worth of elements in each invocation of the window function and it will be invoked for every
    +5 minutes of data.
    +
    +<img src="sliding-windows.svg" class="center" style="width: 80%;" />
    +
    +### Session Windows
    +
    +The *session windows* assigner is ideal for cases where the window boundaries need to adjust to the
    +incoming data. Both the *tumbling windows* and *sliding windows* assigner assign elements to windows
    +that start at fixed time points and have a fixed *window size*. With session windows it is possible
    +to have windows that start at individual points in time for each key and that end once there has
    +been a certain period of inactivity. The configuration parameter is the *session gap* that specifies
    +how long to wait for new data before considering a session as closed.
    +
    +<img src="session-windows.svg" class="center" style="width: 80%;" />
    +
    +### Specifying a Window Assigner
    +
    +The built-in window assigners (except `GlobalWindows`) come in two versions. One for processing-time
    +windowing and one for event-time windowing. The processing-time assigners assign elements to
    +windows based on the current clock of the worker machines while the event-time assigners assign
    +windows based on the timestamps of elements. Please have a look at
    +[event time](/apis/streaming/event_time.html) to learn about the difference between processing time
    +and event time and about how timestamps can be assigned to elements.
    +
    +The following code snippets show how each of the window assigners can be used in a program:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10));
    +DataStream<T> input = ...;
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
    +    .<windowed transformation>(<window function>);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10))
    +val input: DataStream[T] = ...
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
     {% endhighlight %}
     </div>
     </div>
     
    -The general recipe for building a custom window is to specify (1) a `WindowAssigner`, (2) a `Trigger` (optionally),
    -and (3) an `Evictor` (optionally).
    +## Window Functions
     
    -The `WindowAssigner` defines how incoming elements are assigned to windows. A window is a logical group of elements
    -that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according
    -to some notion of time described above within these values are part of the window).
    +The *window function* is used to process the elements of each window (and key) once the system
    +determines that a window is ready for processing (see [triggers](#triggers) for how the system
    +determines when a window is ready).
     
    -For example, the `SlidingEventTimeWindows`
    -assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that
    -time starts from 0 and is measured in milliseconds. Then, we have 6 windows
    -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming
    -element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be
    -assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your
    -own window types by extending the `WindowAssigner` class.
    +The window function can be one of `ReduceFunction`, `FoldFunction` or `WindowFunction`. The former
    +two can be executed more efficiently because Flink can incrementally aggregate the elements for each
    +window as they arrive. A `WindowFunction` gets an `Iterable` for all the elements contained in a
    +window and additional meta information about the window to which the elements belong.
     
    -<div class="codetabs" markdown="1">
    +A windowed transformation with a `WindowFunction` cannot be executed as efficiently as the other
    +cases because Flink has to buffer all elements for a window internally before invoking the function.
    +This can be mitigated by combining a `WindowFunction` with a `ReduceFunction` or `FoldFunction` to
    +get both incremental aggregation of window elements and the additional information that the
    +`WindowFunction` receives. We will look at examples for each of these variants.
    +
    +### ReduceFunction
    +
    +A reduce function specifies how two values can be combined to form one element. Flink can use this
    +to incrementally aggregate the elements in a window.
     
    +A `ReduceFunction` can be used in a program like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -	    All incoming elements of a given key are assigned to the same window.
    -	    The window does not contain a default trigger, hence it will never be triggered
    -	    if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight java %}
    -stream.window(GlobalWindows.create());
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -      {% highlight java %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window when a
    -	          watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size (1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a window a window when the current
    -              processing time exceeds its end-value.
    -            </p>
    -      {% highlight java %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -          <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce(new ReduceFunction<Tuple2<String, Long>> {
    +      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
    +        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
    +      }
    +    });
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            All incoming elements of a given key are assigned to the same window.
    -	    The window does not contain a default trigger, hence it will never be triggered
    -	    if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight scala %}
    -stream.window(GlobalWindows.create)
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Incoming elements are assigned to a window of a certain size (1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size (1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a window a window when the current
    -              processing time exceeds its end-value.
    -
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size (5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -         <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    -</div>
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
     
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
    +{% endhighlight %}
    +</div>
     </div>
     
    -The `Trigger` specifies when the function that comes after the window clause (e.g., `sum`, `count`) is evaluated ("fires")
    -for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the
    -definition of the `WindowAssigner`). Flink comes bundled with a set of triggers if the ones that windows use by
    -default do not fit the application. You can write your own trigger by implementing the `Trigger` interface. Note that
    -specifying a trigger will override the default trigger of the window assigner.
    +A `ReduceFunction` specifies how two elements from the input can be combined to produce
    +an output element. This example will sum up the second field of the tuple for all elements
    +in a window.
     
    -<div class="codetabs" markdown="1">
    +### FoldFunction
     
    +A fold function can be specified like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ProcessingTimeTrigger.create());
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
     {% highlight java %}
    -windowedStream.trigger(EventTimeTrigger.create());
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
    +       public String fold(String acc, Tuple2<String, Long> value) {
    +         return acc + value.f1;
    +       }
    +    });
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        The window is actually fired only when the current processing time exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        A window is actually fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("") { (acc, v) => acc + v._2 }
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
    +</div>
    +</div>
    +
    +A `FoldFunction` specifies how elements from the input will be added to an initial
    +accumulator value (`""`, the empty string, in our example). This example will compute
    +a concatenation of all the `Long` fields of the input.
    +
    +### WindowFunction - The Generic Case
    +
    +Using a `WindowFunction` provides most flexibility, at the cost of performance. The reason for this
    +is that elements cannot be incrementally aggregated for a window and instead need to be buffered
    +internally until the window is considered ready for processing. A `WindowFunction` gets an
    +`Iterable` containing all the elements of the window being processed. The signature of
    +`WindowFunction` is this:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Delta trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5000 milliseconds in the example).
    -        A window is actually fired when the value of the last added element exceeds the value of
    -        the first element inserted in the window according to a `DeltaFunction`.
    -      </p>
    +</div>
    +</div>
    +
    +Here we show an example that uses a `WindowFunction` to count the elements in a window. We do this
    +because we want to access information about the window itself to emit it along with the count.
    +This is very inefficient, however, and should be implemented with a
    +`ReduceFunction` in practice. Below, we will see an example of how a `ReduceFunction` can
    +be combined with a `WindowFunction` to get both incremental aggregation and the added
    +information of a `WindowFunction`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() {
    -    @Override
    -    public double getDelta (Double old, Double new) {
    -        return (new - old > 0.01);
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction());
    +
    +/* ... */
    +
    +public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
    +
    +  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
    +    long count = 0;
    +    for (Tuple<String, Long> in: input) {
    +      count++;
         }
    -}));
    +    out.collect("Window: " + window + "count: " + count);
    +  }
    +}
    +
     {% endhighlight %}
    -    </td>
    -  </tr>
    - </tbody>
    -</table>
     </div>
     
    -
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ProcessingTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(EventTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        The window is actually fired only when the current processing time exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 seconds in the example).
    -        A window is actually fired when a watermark with value that exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
     {% highlight scala %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction())
    +
    +/* ... */
    +
    +class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
    +
    +  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
    +    var count = 0L
    +    for (in <- input) {
    +      count = count + 1
    +    }
    +    out.collect(s"Window $window count: $count")
    +  }
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +</div>
    +
    +### WindowFunction with Incremental Aggregation
    +
    +A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction`. This allows
    +to get the benefit of incremental window computation and also have the additional meta information
    +that writing a `WindowFunction` provides.
    +
    +This is an exampel that shows how incremental aggregation functions can be combined with
    +a `WindowFunction`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +// for folding incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +
    +// for reducing incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyReduceFunction(), new MyWindowFunction());
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Delta trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5000 milliseconds in the example).
    -        A window is actually fired when the value of the last added element exceeds the value of
    -        the first element inserted in the window according to a `DeltaFunction`.
    -      </p>
    +</div>
    +
    +<div data-lang="scala" markdown="1">
     {% highlight scala %}
    -windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 }))
    +val input: DataStream[(String, Long)] = ...
    +
    +// for folding incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction())
    +
    +// for reducing incremental computation
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyReduceFunction(), new MyWindowFunction())
     {% endhighlight %}
    -    </td>
    -  </tr>
    - </tbody>
    -</table>
     </div>
    -
     </div>
     
    -After the trigger fires, and before the function (e.g., `sum`, `count`) is applied to the window contents, an
    -optional `Evictor` removes some elements from the beginning of the window before the remaining elements
    -are passed on to the function. Flink comes bundled with a set of evictors You can write your own evictor by
    -implementing the `Evictor` interface.
    +## Dealing with Late Data
     
    -<div class="codetabs" markdown="1">
    +When working with event-time windowing it can happen that elements arrive late, i.e the
    +watermark that Flink uses to keep track of the progress of event-time is already past the
    +end timestamp of a window to which an element belongs. Please
    +see [event time](apis/streaming/event_time.html) and especially
    +[late elements](pis/streaming/event_time.html#late-elements) for a more thorough discussion of
    +how Flink deals with event time.
    +
    +You can specify for a windowed transformation how it should deal with late elements and how much
    +lateness is allowed. The parameter for this is called *allowed lateness*. This specifies by how much
    +elements can be late. Elements that arrive within the allowed lateness are still put into windows
    +and are considered when computing window results. If elements arrive after the allowed lateness they
    +will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
    +collected once the watermark passes the end of a window plus the allowed lateness.
     
    +You can specify an allowed lateness like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -      <td><strong>Time evictor</strong></td>
    -      <td>
    -        <p>
    -         Evict all elements from the beginning of the window, so that elements from end-value - 1 second
    -         until end-value are retained (the resulting window size is 1 second).
    -        </p>
    -  {% highlight java %}
    -triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
    -  {% endhighlight %}
    -      </td>
    -    </tr>
    -   <tr>
    -       <td><strong>Count evictor</strong></td>
    -       <td>
    -         <p>
    -          Retain 1000 elements from the end of the window backwards, evicting all others.
    -         </p>
    -   {% highlight java %}
    -triggeredStream.evictor(CountEvictor.of(1000));
    -   {% endhighlight %}
    -       </td>
    -     </tr>
    -    <tr>
    -        <td><strong>Delta evictor</strong></td>
    -        <td>
    -          <p>
    -            Starting from the beginning of the window, evict elements until an element with
    -            value lower than the value of the last element is found (by a threshold and a
    -            DeltaFunction).
    -          </p>
    -    {% highlight java %}
    -triggeredStream.evictor(DeltaEvictor.of(5000, new DeltaFunction<Double>() {
    -  public double getDelta (Double oldValue, Double newValue) {
    -      return newValue - oldValue;
    -  }
    -}));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    - </tbody>
    -</table>
    +{% highlight java %}
    +DataStream<T> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .allowedLateness(<time>)
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -      <td><strong>Time evictor</strong></td>
    -      <td>
    -        <p>
    -         Evict all elements from the beginning of the window, so that elements from end-value - 1 second
    -         until end-value are retained (the resulting window size is 1 second).
    -        </p>
    -  {% highlight scala %}
    -triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
    -  {% endhighlight %}
    -      </td>
    -    </tr>
    -   <tr>
    -       <td><strong>Count evictor</strong></td>
    -       <td>
    -         <p>
    -          Retain 1000 elements from the end of the window backwards, evicting all others.
    -         </p>
    -   {% highlight scala %}
    -triggeredStream.evictor(CountEvictor.of(1000));
    -   {% endhighlight %}
    -       </td>
    -     </tr>
    -    <tr>
    -        <td><strong>Delta evictor</strong></td>
    -        <td>
    -          <p>
    -            Starting from the beginning of the window, evict elements until an element with
    -            value lower than the value of the last element is found (by a threshold and a
    -            DeltaFunction).
    -          </p>
    -    {% highlight scala %}
    -windowedStream.evictor(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 }))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    - </tbody>
    -</table>
    -</div>
    +{% highlight scala %}
    +val input: DataStream[T] = ...
     
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .allowedLateness(<time>)
    +    .<windowed transformation>(<window function>)
    +{% endhighlight %}
    +</div>
     </div>
     
    -### Recipes for Building Windows
    -
    -The mechanism of window assigner, trigger, and evictor is very powerful, and it allows you to define
    -many different kinds of windows. Flink's basic window constructs are, in fact, syntactic
    -sugar on top of the general mechanism. Below is how some common types of windows can be
    -constructed using the general mechanism
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 35%">Window type</th>
    -      <th class="text-center">Definition</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td>
    -	  <strong>Tumbling count window</strong><br>
    -    {% highlight java %}
    -stream.countWindow(1000)
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(GlobalWindows.create())
    -  .trigger(PurgingTrigger.of(CountTrigger.of(size)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td>
    -	  <strong>Sliding count window</strong><br>
    -    {% highlight java %}
    -stream.countWindow(1000, 100)
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(GlobalWindows.create())
    -  .evictor(CountEvictor.of(1000))
    -  .trigger(CountTrigger.of(100))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td>
    -	  <strong>Tumbling event time window</strong><br>
    -    {% highlight java %}
    -stream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(5))
    -  .trigger(EventTimeTrigger.create())
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td>
    -	  <strong>Sliding event time window</strong><br>
    -    {% highlight java %}
    -stream.timeWindow(Time.seconds(5), Time.seconds(1))
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -  .trigger(EventTimeTrigger.create())
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td>
    -	  <strong>Tumbling processing time window</strong><br>
    -    {% highlight java %}
    -stream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))
    -  .trigger(ProcessingTimeTrigger.create())
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td>
    -	  <strong>Sliding processing time window</strong><br>
    -    {% highlight java %}
    -stream.timeWindow(Time.seconds(5), Time.seconds(1))
    -    {% endhighlight %}
    -	</td>
    -        <td>
    -    {% highlight java %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -  .trigger(ProcessingTimeTrigger.create())
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    -
    -
    -## Windows on Unkeyed Data Streams
    -
    -You can also define windows on regular (non-keyed) data streams using the `windowAll` transformation. These
    -windowed data streams have all the capabilities of keyed windowed data streams, but are evaluated at a single
    -task (and hence at a single computing node). The syntax for defining triggers and evictors is exactly the
    -same:
    +The time can be one of `Time.seconds(x)`, `Time.minutes(x)`, and so on.
    +
    +<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
    +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
    +
    +## Triggers
    +
    +A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being
    +processed by the *window function*. The trigger observes how elements are added to windows
    +and can also keep track of the progress of processing time and event time. Once a trigger
    +determines that a window is ready for processing, it fires. This is the signal for the
    +window operation to take the elements that are currently in the window and pass them along to
    +the window function to produce output for the firing window.
    +
    +Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be
    +appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as
    +default trigger. This trigger simply fires once the watermark passes the end of a window.
    +
    +You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The
    +whole specification of the windowed transformation would then look like this:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -nonKeyedStream
    -    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10));
    +DataStream<T> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .trigger(<trigger>)
    +    .<windowed transformation>(<window function>);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -nonKeyedStream
    -    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10))
    +val input: DataStream[T] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .trigger(<trigger>)
    +    .<windowed transformation>(<window function>)
     {% endhighlight %}
     </div>
     </div>
     
    -Basic window definitions are also available for windows on non-keyed streams:
    +Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that
    +fires based on the progress of event-time as measured by the watermark, The `ProcessingTimeTrigger`
    +does the same but based on processing time and the `CountTrigger` fires once the number of elements
    +in a window exceeds the given limit.
    +
    +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
    +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
    +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
    +progress of time but only by count. Right now, you have to write your own custom trigger if
    +you want to react based on both time and count.
    +
    +The internal `Trigger` API is still considered experimental but you can check out the code
    +if you want to write your own custom trigger:
    +{% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %}.
    +
    +## Non-keyed Windowing
    +
    +You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however,
    +that Flink cannot process windows for different keys in parallel, essentially turning the
    +transformation into a non-parallel operation.
    +
    +
    +The basic structure of a non-keyed windowed transformation is as follows:
    --- End diff --
    
    How about this:
    
    `<span class="label label-danger">Warning</span>` As mentioned in the introduction, non-keyed
    windows have the disadvantage that work cannot be distributed in the cluster because
    windows cannot be computed independently per key. This can have severe performance implications.


> Update Windowing Documentation
> ------------------------------
>
>                 Key: FLINK-4062
>                 URL: https://issues.apache.org/jira/browse/FLINK-4062
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> The window documentation could be a bit more principled and also needs updating with the new allowed lateness setting.
> There is also essentially no documentation about how to write a custom trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message