flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
Date Fri, 02 Mar 2018 14:20:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383618#comment-16383618
] 

ASF GitHub Bot commented on FLINK-8459:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857970
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID)
{
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    +		}
    +
    +		if (cancelJob) {
    +			checkpointCoordinator.stopCheckpointScheduler();
    +		}
    +		return checkpointCoordinator
    +			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    +			.thenApply(CompletedCheckpoint::getExternalPointer)
    +			.thenApplyAsync(path -> {
    +				if (cancelJob) {
    +					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
    +					cancel(timeout);
    +				}
    +				return path;
    +			}, getMainThreadExecutor())
    +			.exceptionally(throwable -> {
    +				if (cancelJob) {
    +					startCheckpointScheduler(checkpointCoordinator);
    +				}
    +				throw new CompletionException(throwable);
    +			});
    +	}
    +
    +	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator)
{
    --- End diff --
    
    Method can be reused in the job rescaling logic.


> Implement cancelWithSavepoint in RestClusterClient
> --------------------------------------------------
>
>                 Key: FLINK-8459
>                 URL: https://issues.apache.org/jira/browse/FLINK-8459
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Implement the method
>         {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String
savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating the logic
in {{JobCancellationWithSavepointHandlers}}. The former will have different semantics because
the checkpoint scheduler is not stopped. Thus it is not guaranteed that there won't be additional
checkpoints between the savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message