flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4062) Update Windowing Documentation
Date Fri, 01 Jul 2016 08:06:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358614#comment-15358614
] 

ASF GitHub Bot commented on FLINK-4062:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2154#discussion_r69263452
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -24,1023 +24,608 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    +Flink uses a concept called *windows* to divide a (potentially) infinite `DataStream`
into finite
    +slices based on the timestamps of elements or other criteria. This division is required
when working
    +with infinite streams of data and performing transformations that aggregate elements.
    +
    +<span class="label label-info">Info</span> We will mostly talk about *keyed
windowing* here, i.e.
    +windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements
are
    +subdivided based on both window and key before being given to
    +a user function. The work can thus be distributed across the cluster
    +because the elements for different keys can be processed independently. If you absolutely
have to,
    +you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed
    +windows work.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -## Windows on Keyed Data Streams
    -
    -Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these
group elements *per key*,
    -i.e., each window will contain elements with the same key value.
    +## Basics
     
    -### Basic Window Constructs
    +For a windowed transformations you must at least specify a *key*
    +(see [specifying keys](apis/common/index.html#specifying-keys))
    +a *window assigner* and a *window function*. The *key* divides the infinite, non-keyed,
stream
    +into logical keyed streams while the *window assigner* assigns elements to finite per-key
windows.
    +Finally, the *window function* is used to process the elements of each window.
     
    -Flink offers a general window mechanism that provides flexibility, as well as a number
of pre-defined windows
    -for common use cases. See first if your use case can be served by the pre-defined windows
below before moving
    -to defining your own windows.
    +The basic structure of a windowed transformation is thus as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<T> input = ...;
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every
element belongs to exactly one window.
    -	  The notion of time is specified by the selected TimeCharacteristic (see <a href="{{
site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5));
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that
elements are
    -             grouped according to their timestamp in groups of 5 second duration, and
elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see
<a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements
are
    -          grouped according to their arrival time (equivalent to processing time) in
groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight java %}
    -keyedStream.countWindow(1000);
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means
that elements are
    -          grouped according to their arrival time (equivalent to processing time) in
groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by
at most 900 elements).
    -  {% highlight java %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[T] = ...
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that elements are
    -          grouped according to their timestamp in groups of 5 second duration, and every
element belongs to exactly one window.
    -          The notion of time is specified by the selected TimeCharacteristic (see <a
href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. This means that
elements are
    -             grouped according to their timestamp in groups of 5 second duration, and
elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected TimeCharacteristic (see
<a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -      {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means that elements
are
    -          grouped according to their arrival time (equivalent to processing time) in
groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight scala %}
    -keyedStream.countWindow(1000)
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream
&rarr; WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 elements. This means
that elements are
    -          grouped according to their arrival time (equivalent to processing time) in
groups of 1000 elements,
    -          and every element can belong to more than one window (as windows overlap by
at most 900 elements).
    -  {% highlight scala %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>)
    +{% endhighlight %}
     </div>
     </div>
     
    -### Advanced Window Constructs
    +We will cover [window assigners](#window-assigners) in a separate section below.
    +
    +The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively
    +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these
ways
    +of specifying a windowed transformation in detail below: [window functions](#window-functions).
    +
    +For more advanced use cases you can also specify a `Trigger` that determines when exactly
a window
    +is being considered as *ready for processing*. These will be covered in more detail in
    +[triggers](#triggers).
    +
    +## Window Assigners
    +
    +The window assigner specifies how elements of the stream are divided into finite slices.
Flink comes
    +with pre-implemented window assigners for the most typical use cases, namely *tumbling
windows*,
    +*sliding windows*, *session windows* and *global windows* but you can implement your
own by
    +extending the `WindowAssigner` class. All the built-in window assigners, except for the
global
    +windows one, assign elements to windows based on time, which can either be processing
time or event
    +time.
    +
    +Let's first look at how each of these window assigners works before looking at how they
can be used
    +in a Flink program. We will be using abstract figures to visualize the workings of each
assigner:
    +in the following, the purple circles are elements of the stream, they are partitioned
    +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis shows the progress
    +of time.
    +
    +### Global Windows
    +
    +Global windows are a way of specifying that we don't want to subdivide our elements into
windows.
    +Each element is assigned to one single per-key *global window*.
    +This windowing scheme is only useful if you also specify a custom [trigger](#triggers).
Otherwise,
    +no computation is ever going to be performed, as the global window does not have a natural
end at
    +which we could process the aggregated elements.
    +
    +<img src="non-windowed.svg" class="center" style="width: 80%;" />
    +
    +### Tumbling Windows
    +
    +A *tumbling windows* assigner assigns elements to fixed length, non-overlapping windows
of a
    +specified *window size*.. For example, if you specify a window size of 5 minutes, the
window
    +function will get 5 minutes worth of elements in each invocation.
     
    -The general mechanism can define more powerful windows at the cost of more verbose syntax.
For example,
    -below is a window definition where windows hold elements of the last 5 seconds and slides
every 1 second,
    -but the execution of the window function is triggered when 100 elements have been added
to the
    -window, and every time execution is triggered, 10 elements are retained in the window:
    +<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
    +
    +### Sliding Windows
    +
    +The *sliding windows* assigner assigns elements to windows of fixed length equal to *window
size*,
    +as the tumbling windows assigner, but in this case, windows can be overlapping. The size
of the
    +overlap is defined by the user-specified parameter *window slide*. As windows are overlapping,
an
    +element can be assigned to multiple windows
    +
    +For example, you could have windows of size 10 minutes that slide by 5 minutes. With
this you get 10
    +minutes worth of elements in each invocation of the window function and it will be invoked
for every
    +5 minutes of data.
    +
    +<img src="sliding-windows.svg" class="center" style="width: 80%;" />
    +
    +### Session Windows
    +
    +The *session windows* assigner is ideal for cases where the window boundaries need to
adjust to the
    +incoming data. Both the *tumbling windows* and *sliding windows* assigner assign elements
to windows
    +that start at fixed time points and have a fixed *window size*. With session windows
it is possible
    +to have windows that start at individual points in time for each key and that end once
there has
    +been a certain period of inactivity. The configuration parameter is the *session gap*
that specifies
    +how long to wait for new data before considering a session as closed.
    +
    +<img src="session-windows.svg" class="center" style="width: 80%;" />
    +
    +### Specifying a Window Assigner
    +
    +The built-in window assigners (except `GlobalWindows`) come in two versions. One for
processing-time
    +windowing and one for event-time windowing. The processing-time assigners assign elements
to
    +windows based on the current clock of the worker machines while the event-time assigners
assign
    +windows based on the timestamps of elements. Please have a look at
    +[event time](/apis/streaming/event_time.html) to learn about the difference between processing
time
    +and event time and about how timestamps can be assigned to elements.
    +
    +The following code snippets show how each of the window assigners can be used in a program:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10));
    +DataStream<T> input = ...;
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
    +    .<windowed transformation>(<window function>);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10))
    +val input: DataStream[T] = ...
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
     {% endhighlight %}
     </div>
     </div>
     
    -The general recipe for building a custom window is to specify (1) a `WindowAssigner`,
(2) a `Trigger` (optionally),
    -and (3) an `Evictor` (optionally).
    +## Window Functions
     
    -The `WindowAssigner` defines how incoming elements are assigned to windows. A window
is a logical group of elements
    -that has a begin-value, and an end-value corresponding to a begin-time and end-time.
Elements with timestamp (according
    -to some notion of time described above within these values are part of the window).
    +The *window function* is used to process the elements of each window (and key) once the
system
    +determines that a window is ready for processing (see [triggers](#triggers) for how the
system
    +determines when a window is ready).
     
    -For example, the `SlidingEventTimeWindows`
    -assigner in the code above defines a window of size 5 seconds, and a slide of 1 second.
Assume that
    -time starts from 0 and is measured in milliseconds. Then, we have 6 windows
    -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000,
10000]. Each incoming
    -element is assigned to the windows according to its timestamp. For example, an element
with timestamp 2000 will be
    -assigned to the first three windows. Flink comes bundled with window assigners that cover
the most common use cases. You can write your
    -own window types by extending the `WindowAssigner` class.
    +The window function can be one of `ReduceFunction`, `FoldFunction` or `WindowFunction`.
The former
    --- End diff --
    
    done


> Update Windowing Documentation
> ------------------------------
>
>                 Key: FLINK-4062
>                 URL: https://issues.apache.org/jira/browse/FLINK-4062
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> The window documentation could be a bit more principled and also needs updating with
the new allowed lateness setting.
> There is also essentially no documentation about how to write a custom trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message