flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Date Fri, 02 Mar 2018 14:19:20 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857970
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID)
{
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    +		}
    +
    +		if (cancelJob) {
    +			checkpointCoordinator.stopCheckpointScheduler();
    +		}
    +		return checkpointCoordinator
    +			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    +			.thenApply(CompletedCheckpoint::getExternalPointer)
    +			.thenApplyAsync(path -> {
    +				if (cancelJob) {
    +					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
    +					cancel(timeout);
    +				}
    +				return path;
    +			}, getMainThreadExecutor())
    +			.exceptionally(throwable -> {
    +				if (cancelJob) {
    +					startCheckpointScheduler(checkpointCoordinator);
    +				}
    +				throw new CompletionException(throwable);
    +			});
    +	}
    +
    +	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator)
{
    --- End diff --
    
    Method can be reused in the job rescaling logic.


---

Mime
View raw message