flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
Date Wed, 17 Oct 2018 14:56:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653673#comment-16653673

ASF GitHub Bot commented on FLINK-10205:

tillrohrmann commented on issue #6684:     [FLINK-10205] Batch Job: InputSplit Fault tolerant
for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-430661796
   Alright, now the problem is a bit clearer to me. The underlying problem is that the `InputSplitAssigner's`
semantics in case of a failover are not well defined. This is mainly due to the fact that
Flink evolved over time.
   The general idea of the `InputSplitAssigner` is to lazily assign work to sources which
have completely consumed their current `InputSplit`. The order in which this happens should
not affect the correctness of the result.
   If you say that in case of a recovery the exact same `InputSplit` assignment needs to happen
again, then I think it must be because our sources have some kind of state. Otherwise, it
should not matter which source task completes the `InputSplit`, right? If this is correct,
then we would run into the same problem if a JM failure happens, because we would lose all
`InputSplit` assignment information which is stored on the JM. So stateful sources with `InputSplits`
don't work at the moment (in the general case).
   If we assume that our sources are stateless, then simply returning the input splits to
the assigner and letting the next idling task take it should work. In your example of the
infinite stream which is initialized via the `InputSplits` there would be no other task competing
for the `InputSplit` of a failed task because by definition they never finish their work,
right? If multiple tasks fail, then the mapping might be different after the recovery, but
every task would continue consuming from a single `InputSplit`. I think the problem here is
that you abused the `InputSplitAssigner` for something it is not yet intended to do.
   The reason why I'm a bit hesitant here is because I think we do not fully understand yet
what we actually want to have. Moreover, some corner cases not clear to me yet. For example,
why would it be ok for a global failover to change the mapping and not for region failover?
Another example is how to handle the case where we lose a TM and need to downscale. Would
that effectively be a global failover where we redistribute all `InputSplits` (I would think
   Before starting any concrete implementation steps, I think we should properly design this
feature to get it right. A very related topic is actually the new source interface. Depending
on how much we are able to unify batch and streaming, the whole `InputSplit` assignment might
move into a single task (similar to the `ContinuousFileMonitoringSink`) and the assignment
might become part of a checkpoint. That way, we would no longer need to take care of this
on the JM side.

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> -------------------------------------------------------
>                 Key: FLINK-10205
>                 URL: https://issues.apache.org/jira/browse/FLINK-10205
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 1.6.1, 1.6.2, 1.7.0
>            Reporter: JIN SUN
>            Assignee: JIN SUN
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>   Original Estimate: 168h
>  Remaining Estimate: 168h
> Today DataSource Task pull InputSplits from JobManager to achieve better performance,
however, when a DataSourceTask failed and rerun, it will not get the same splits as its previous
version. this will introduce inconsistent result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch scenario),
this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask deterministic. The propose
is save all splits into ExecutionVertex and DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]

This message was sent by Atlassian JIRA

View raw message