flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-1967) Introduce (Event)time in Streaming
Date Wed, 01 Jul 2015 14:50:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14610400#comment-14610400
] 

ASF GitHub Bot commented on FLINK-1967:
---------------------------------------

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/879

    [FLINK-1967] Introduce (Event)time in Streaming

    (This PR consists of other commits because of bugs discovered while writing tests for
the new feature and existing features.)
    
    This is the first step towards supporting proper event-time windowing.
    
    ## [FLINK-1967] Introduce (Event)time in Streaming
    
    This introduces an additional timestamp field in StreamRecord of type
    org.joda.time.Instant. When using a SourceFunction, the timestamp is
    automatically set to Instant.now() upon element emission. The timestamp
    can be manually set using a ManualTimestampSourceFunction.
    
    This also changes the signature of the StreamOperators. They now get
    a StreamRecord instead of the unwrapped value. This is necessary for
    them to access the timestamp. Before, the StreamTask would unwrap the
    value from the StreamRecord, now this is moved one layer higher.
    
    This also introduces watermarks to keep track of processing. At
    a configurable interval the sources will emit watermarks that signify
    that no records with a lower timestamp will be emitted in the future by
    this source. The timestamps are broadcast, stream inputs wait for watermark
    events on all input channels and forward the watermark to the
    StreamOperator once the watermark advances on all inputs. Operators are
    responsible for forwarding the watermark once they know that no elements
    with a previous timestamp will be emitted (this is mostly relevant for
    buffering operations such as windows). Right now, all operators simply
    forward the watermark they receive.
    
    When using a ManualTimestampSourceFunction the system does not
    automatically emit timestamps, the user is required to manually emit
    watermarks using the SourceContext.
    
    No watermarks will be emitted unless
    ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
    interval.
    
    ## [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
    
    This changes CoReader (now CoStreamingRecordReader) to reuse
    UnionGate for the input multiplexing. This way it will not lock in on
    one specific input side and read events from both input sides.
    
    This also enables an event listener for checkpoint barriers so that the
    TwoInputTask now reacts to those and correctly forwards them.
    
    Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
    works in topologies with TwoInputStreamTasks.
    
    This also adds tests for OneInputStreamTask and TwoInputStreamTask
    that check whether:
     - whether open()/close() of RichFunctions are correctly
       called
     - Watermarks are correctly forwarded
     - Supersteps/checkpoint barriers are correctly forwarded and the
       blocking of inputs works correctly
    
    ## Add proper tests for Stream Operators
    
    These test whether:
     - open()/close() on RichFunctions are called
     - Timestamps of emitted elements match the timestamp of the input
       element
     - Watermarks are correctly forwarded
    
    ## [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
    
    Before, a CheckpointBarrier from a more recent Checkpoint would also
    trigger unblocking while waiting on an older CheckpointBarrier. Now,
    a CheckpointBarrier from a newer checkpoint will unblock all channels
    and start a new wait on the new Checkpoint.
    
    The tests for OneInputStreamTask and TwoInputStreamTask check whether
    the buffer behaves correctly when receiving CheckpointBarriers from more
    recent checkpoints while still waiting on an older CheckpointBarrier.
    
    ## Performance Testing
    
    To see what the impact of the changes is I ran this program:
    
    ```
    env
        .addSource(new TupleSource(numGroups))
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS))
        .sum(1)
        .flatten()
        .writeAsCsv(<blabla>);
    ```
    
    `TupleSource` is emitting `("group x", 1)` in a loop. I used a Google Compute Cluster
with 3 Workers, DOP=6.
    
    These are the average number of tuples in a group over a few minutes runtime:
    
     - master: 1,093,726
     - event-time: 949,541
     - watermark every 500 ms: 887,087


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink event-time

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/879.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #879
    
----
commit 2673e7af3cded1c7fa24a715f30e317a3640a193
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2015-06-22T10:26:44Z

    [FLINK-1967] Introduce (Event)time in Streaming
    
    This introduces an additional timestamp field in StreamRecord of type
    org.joda.time.Instant. When using a SourceFunction, the timestamp is
    automatically set to Instant.now() upon element emission. The timestamp
    can be manually set using a ManualTimestampSourceFunction.
    
    This also changes the signature of the StreamOperators. They now get
    a StreamRecord instead of the unwrapped value. This is necessary for
    them to access the timestamp. Before, the StreamTask would unwrap the
    value from the StreamRecord, now this is moved one layer higher.
    
    This also introduces watermarks to keep track of processing. At
    a configurable interval the sources will emit watermarks that signify
    that no records with a lower timestamp will be emitted in the future by
    this source. The timestamps are broadcast, stream inputs wait for watermark
    events on all input channels and forward the watermark to the
    StreamOperator once the watermark advances on all inputs. Operators are
    responsible for forwarding the watermark once they know that no elements
    with a previous timestamp will be emitted (this is mostly relevant for
    buffering operations such as windows). Right now, all operators simply
    forward the watermark they receive.
    
    When using a ManualTimestampSourceFunction the system does not
    automatically emit timestamps, the user is required to manually emit
    watermarks using the SourceContext.
    
    No watermarks will be emitted unless
    ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
    interval.

commit 9fa1a7d12b4c2b2fbf65580f5e5e0de98979ca45
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2015-06-29T15:12:59Z

    [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
    
    This changes CoReader (now CoStreamingRecordReader) to reuse
    UnionGate for the input multiplexing. This way it will not lock in on
    one specific input side and read events from both input sides.
    
    This also enables an event listener for checkpoint barriers so that the
    TwoInputTask now reacts to those and correctly forwards them.
    
    Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
    works in topologies with TwoInputStreamTasks.
    
    This also adds tests for OneInputStreamTask and TwoInputStreamTask
    that check whether:
     - whether open()/close() of RichFunctions are correctly
       called
     - Watermarks are correctly forwarded
     - Supersteps/checkpoint barriers are correctly forwarded and the
       blocking of inputs works correctly

commit 3da4c4c99ccff7f18c822699c1857b7a56672878
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2015-06-30T10:25:40Z

    Add proper tests for Stream Operators
    
    These test whether:
     - open()/close() on RichFunctions are called
     - Timestamps of emitted elements match the timestamp of the input
       element
     - Watermarks are correctly forwarded

commit 16dcb7ee96a0c6edd4549fe3f42073681dc81493
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2015-06-30T14:14:25Z

    Ignore KafkaITCase.testPersistentSourceWithOffsetUpdates

commit 7416eff02319b1eb695fa6e2986b93d9b51f2d05
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Date:   2015-06-30T15:10:05Z

    [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
    
    Before, a CheckpointBarrier from a more recent Checkpoint would also
    trigger unblocking while waiting on an older CheckpointBarrier. Now,
    a CheckpointBarrier from a newer checkpoint will unblock all channels
    and start a new wait on the new Checkpoint.
    
    The tests for OneInputStreamTask and TwoInputStreamTask check whether
    the buffer behaves correctly when receiving CheckpointBarriers from more
    recent checkpoints while still waiting on an older CheckpointBarrier.

----


> Introduce (Event)time in Streaming
> ----------------------------------
>
>                 Key: FLINK-1967
>                 URL: https://issues.apache.org/jira/browse/FLINK-1967
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the sources
to add timestamps to records. This will also introduce punctuations (or low watermarks) to
allow windows to work correctly on unordered, timestamped input data. In the process of this,
the windowing subsystem also needs to be adapted to use the punctuations. Furthermore, all
operators need to be made aware of punctuations and correctly forward them. Then, a new operator
must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message