flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
Date Fri, 24 Nov 2017 14:45:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265345#comment-16265345
] 

Aljoscha Krettek commented on FLINK-7883:
-----------------------------------------

The problem is that this is harder than it looks. In a Flink pipeline data can not only originate
at sources but there can be other things in the pipeline that produce events "out of thin
air". From the top of my head I can think of: 
 - Sources: this is obvious, sources emit events
 - Reaction to watermark timers, these can emit events. Not as easy to control because watermarks
can not only originate at sources but also at any operator in the pipeline
 - Reaction to processing-time timers, these can also emit events at any time
 - "Special" operators that emit data from a separate thread. Currently this would be the
Async I/O operator and the {{ContinuousFileReaderOperator}} which forms the "file source"
together with {{ContinuousFileMonitoringFunction}}.

If we want to make sure that we don't emit any unwanted data we have to _quiesce_ the whole
pipeline first. This can be done via special messages (like watermarks) that are injected
at the sources and traverse the topology based on a message from the {{JobManager}}. All operators
would have to report that they are quiesced before we do the savepoint. In case the savepoint
fails for some reason we need to _un-quiesce_ the pipeline again.

The above is the hard part. Making sure that we don't emit data from a "source" is as simple
as ensuring that {{SourceContext.collect()}} doesn't forward the data that the source wants
to emit.

We definitely have to solve this problem but I don't think that we can do this for the 1.5
release because the community decided to do a very short release cycle after 1.4.0 because
a bunch of important features are almost ready to be released?

> Stop fetching source before a cancel with savepoint
> ---------------------------------------------------
>
>                 Key: FLINK-7883
>                 URL: https://issues.apache.org/jira/browse/FLINK-7883
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Kafka Connector, State Backends, Checkpointing
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call once the
savepoint is finished, but during the savepoint execution, kafka source continue to poll new
messages which will not be part of the savepoint and will be replayed on the next application
start.
> A solution could be to stop fetching the source stream task before triggering the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a method {{stopFetching}}
that existant SourceFunction implementations could implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from {{JobManager.handleMessage(CancelJobWithSavepoint)}}
to {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message