flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
Date Wed, 08 May 2019 10:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835474#comment-16835474
] 

Till Rohrmann edited comment on FLINK-12381 at 5/8/19 10:06 AM:
----------------------------------------------------------------

I think the observed problem is caused by FLINK-10291. FLINK-10291 was needed because the
{{StandaloneJobClusterEntrypoin}} generates the {{JobGraph}} from the user code which is stored
within the container image. In order to resume from existing checkpoints in case of a failover,
it was needed to create the {{JobGraph}} with a predictable {{JobID}}.

Due to this change, one either needs to configure a different {{high-availability.cluster-id}}
for different jobs/cluster or start the {{StandaloneJobClusterEntrypoint}} with a different
{{JobID}} via the {{-jid}}/{{--job-id}} (FLINK-11525) option when using HA. Otherwise it is
not possible to distinguish different clusters (in the context of the job mode it is equivalent
to a job). 

Unfortunately, it also has implications for the checkpointing directory which uses the {{JobID}}
to distinguish different jobs. So at the moment I would recommend to specify the {{JobID}}
via the {{--job-id}} command line option when starting the {{StandaloneJobClusterEntrypoin}}.
Alternatively, you would need to configure different checkpointing directories for different
job clusters.


was (Author: till.rohrmann):
I think the observed problem is caused by FLINK-10291. FLINK-10291 was needed because the
{{StandaloneJobClusterEntrypoin}} generates the {{JobGraph}} from the user code which is stored
within the container image. In order to resume from existing checkpoints in case of a failover,
it was needed to create the {{JobGraph}} with a predictable {{JobID}}.

Due to this change, one either needs to configure a different {{high-availability.cluster-id}}
for different jobs/cluster or start the {{StandaloneJobClusterEntrypoint}} with a different
{{JobID}} via the {{-jid}}/{{--job-id}} (FLINK-11545) option when using HA. Otherwise it is
not possible to distinguish different clusters (in the context of the job mode it is equivalent
to a job). 

Unfortunately, it also has implications for the checkpointing directory which uses the {{JobID}}
to distinguish different jobs. So at the moment I would recommend to specify the {{JobID}}
via the {{--job-id}} command line option when starting the {{StandaloneJobClusterEntrypoin}}.
Alternatively, you would need to configure different checkpointing directories for different
job clusters.

> W/o HA, upon a full restart, checkpointing crashes
> --------------------------------------------------
>
>                 Key: FLINK-12381
>                 URL: https://issues.apache.org/jira/browse/FLINK-12381
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.8.0
>         Environment: Same as FLINK-\{12379, 12377, 12376}
>            Reporter: Henrik
>            Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 'gs://example_bucket/flink/checkpoints/00000000000000000000000000000000/chk-16/_metadata'
already exists
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:74)
>     at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65)
>     at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the job completely.
Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message