flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5312: [FLINK-8344][flip6] Add support for HA to RestClus...
Date Fri, 26 Jan 2018 10:14:41 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r164075001
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String
savepointDirecto
     
     		final CompletableFuture<SavepointTriggerResponseBody> responseFuture;
     
    -		try {
    -			responseFuture = restClient.sendRequest(
    -				restClusterClientConfiguration.getRestServerAddress(),
    -				restClusterClientConfiguration.getRestServerPort(),
    -				savepointTriggerHeaders,
    -				savepointTriggerMessageParameters,
    -				new SavepointTriggerRequestBody(savepointDirectory));
    -		} catch (IOException e) {
    -			throw new FlinkException("Could not send trigger savepoint request to Flink cluster.",
e);
    -		}
    +		responseFuture = sendRequest(
    +			savepointTriggerHeaders,
    +			savepointTriggerMessageParameters,
    +			new SavepointTriggerRequestBody(savepointDirectory));
     
    -		return responseFuture.thenApply(savepointTriggerResponseBody -> {
    +		return responseFuture.thenCompose(savepointTriggerResponseBody -> {
     			final SavepointTriggerId savepointTriggerId = savepointTriggerResponseBody.getSavepointTriggerId();
    -			final SavepointInfo savepointInfo;
    -			try {
    -				savepointInfo = waitForSavepointCompletion(jobId, savepointTriggerId);
    -			} catch (Exception e) {
    -				throw new CompletionException(e);
    -			}
    +			return waitForSavepointCompletion(jobId, savepointTriggerId);
    +		}).thenApply(savepointInfo -> {
     			if (savepointInfo.getFailureCause() != null) {
     				throw new CompletionException(savepointInfo.getFailureCause());
     			}
     			return savepointInfo.getLocation();
     		});
     	}
     
    -	private SavepointInfo waitForSavepointCompletion(
    +	private CompletableFuture<SavepointInfo> waitForSavepointCompletion(
    --- End diff --
    
    It's now `pollSavepointAsync`


---

Mime
View raw message