flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
Date Fri, 02 Mar 2018 14:18:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616
] 

ASF GitHub Bot commented on FLINK-8459:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857517
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID)
{
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    +		}
    +
    +		if (cancelJob) {
    +			checkpointCoordinator.stopCheckpointScheduler();
    +		}
    +		return checkpointCoordinator
    +			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    +			.thenApply(CompletedCheckpoint::getExternalPointer)
    +			.thenApplyAsync(path -> {
    +				if (cancelJob) {
    +					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
    +					cancel(timeout);
    +				}
    +				return path;
    +			}, getMainThreadExecutor())
    +			.exceptionally(throwable -> {
    +				if (cancelJob) {
    +					startCheckpointScheduler(checkpointCoordinator);
    --- End diff --
    
    If the cancelation failed, we restart the scheduler as well. I think this differs from
the previous implementation.


> Implement cancelWithSavepoint in RestClusterClient
> --------------------------------------------------
>
>                 Key: FLINK-8459
>                 URL: https://issues.apache.org/jira/browse/FLINK-8459
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Implement the method
>         {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String
savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating the logic
in {{JobCancellationWithSavepointHandlers}}. The former will have different semantics because
the checkpoint scheduler is not stopped. Thus it is not guaranteed that there won't be additional
checkpoints between the savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message