flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Konstantin Knauf (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
Date Fri, 17 May 2019 08:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841524#comment-16841524
] 

Konstantin Knauf edited comment on FLINK-12381 at 5/17/19 8:23 AM:
-------------------------------------------------------------------

[~till.rohrmann] I am happy to take this. Let me summarize what, I think, we need to do based
on the discussion:

1) To solve the issue at hand, we only need to make sure, that {{JobID}} s are per-default
randomly generated in {{StandaloneJobClusterEntrypoint}}, instead of defaulting to {{0}},
when HA is disabled. Are we breaking any public API by this? As far as I know, no, and a note
in the release docs suffices.

2) In the case of a Job Cluster with HA enabled my understanding is, that there could still
be a conflict even with different {{high-availability.cluster-id}}s, because the {{cluster-id}}
is not part of the checkpoint path (or is it?). It only prevents clashes in Zookeeper. So,
if there are left over checkpoints (reatained checkpoint or job never reached terminal state)
from a previous job with a different {{cluster-id}}, but same {{state.checkpoints.dir}} and
same default {{JobID}} ({{0}}), there would still be an issue. Therefore, in this case we
would like to inject the {{JobID}} instead of defaulting to {{0}}.





was (Author: knaufk):
[~till.rohrmann] I am happy to take this. Let me summarize what, I think, we need to do based
on the discussion:

1) To solve the issue at hand, we only need to make sure, that {{JobID}}s are per-default
randomly generated in {{StandaloneJobClusterEntrypoint}}, instead of defaulting to {{0}},
when HA is disabled. Are we breaking any public API by this? As far as I know, no, and a note
in the release docs suffices.

2) In the case of a Job Cluster with HA enabled my understanding is, that there could still
be a conflict even with different {{high-availability.cluster-id}}s, because the {{cluster-id}}
is not part of the checkpoint path (or is it?). It only prevents clashes in Zookeeper. So,
if there are left over checkpoints (reatained checkpoint or job never reached terminal state)
from a previous job with a different {{cluster-id}}, but same {{state.checkpoints.dir}} and
same default {{JobID}} ({{0}}), there would still be an issue. Therefore, in this case we
would like to inject the {{JobID}} instead of defaulting to {{0}}.




> W/o HA, upon a full restart, checkpointing crashes
> --------------------------------------------------
>
>                 Key: FLINK-12381
>                 URL: https://issues.apache.org/jira/browse/FLINK-12381
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.8.0
>         Environment: Same as FLINK-\{12379, 12377, 12376}
>            Reporter: Henrik
>            Assignee: Konstantin Knauf
>            Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-16/_metadata'
already exists
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:74)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the job completely.
Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message