flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5312: [FLINK-8344][flip6] Add support for HA to RestClus...
Date Thu, 25 Jan 2018 09:21:00 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r163775574
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String
savepointDirecto
     
     		final CompletableFuture<SavepointTriggerResponseBody> responseFuture;
     
    -		try {
    -			responseFuture = restClient.sendRequest(
    -				restClusterClientConfiguration.getRestServerAddress(),
    -				restClusterClientConfiguration.getRestServerPort(),
    -				savepointTriggerHeaders,
    -				savepointTriggerMessageParameters,
    -				new SavepointTriggerRequestBody(savepointDirectory));
    -		} catch (IOException e) {
    -			throw new FlinkException("Could not send trigger savepoint request to Flink cluster.",
e);
    -		}
    +		responseFuture = sendRequest(
    +			savepointTriggerHeaders,
    +			savepointTriggerMessageParameters,
    +			new SavepointTriggerRequestBody(savepointDirectory));
     
    -		return responseFuture.thenApply(savepointTriggerResponseBody -> {
    +		return responseFuture.thenCompose(savepointTriggerResponseBody -> {
     			final SavepointTriggerId savepointTriggerId = savepointTriggerResponseBody.getSavepointTriggerId();
    -			final SavepointInfo savepointInfo;
    -			try {
    -				savepointInfo = waitForSavepointCompletion(jobId, savepointTriggerId);
    -			} catch (Exception e) {
    -				throw new CompletionException(e);
    -			}
    +			return waitForSavepointCompletion(jobId, savepointTriggerId);
    +		}).thenApply(savepointInfo -> {
     			if (savepointInfo.getFailureCause() != null) {
     				throw new CompletionException(savepointInfo.getFailureCause());
     			}
     			return savepointInfo.getLocation();
     		});
     	}
     
    -	private SavepointInfo waitForSavepointCompletion(
    +	private CompletableFuture<SavepointInfo> waitForSavepointCompletion(
     			final JobID jobId,
    -			final SavepointTriggerId savepointTriggerId) throws Exception {
    +			final SavepointTriggerId savepointTriggerId) {
     		return waitForResource(() -> {
     			final SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
     			final SavepointStatusMessageParameters savepointStatusMessageParameters =
     				savepointStatusHeaders.getUnresolvedMessageParameters();
     			savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
     			savepointStatusMessageParameters.savepointTriggerIdPathParameter.resolve(savepointTriggerId);
    -			return restClient.sendRequest(
    -				restClusterClientConfiguration.getRestServerAddress(),
    -				restClusterClientConfiguration.getRestServerPort(),
    +			return sendRetryableRequest(
     				savepointStatusHeaders,
    -				savepointStatusMessageParameters
    -			);
    +				savepointStatusMessageParameters,
    +				EmptyRequestBody.getInstance(),
    +				isConnectionProblemException());
     		});
     	}
     
     	@Override
     	public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws
Exception {
    -		JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance();
    -		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
    -			restClusterClientConfiguration.getRestServerAddress(),
    -			restClusterClientConfiguration.getRestServerPort(),
    -			headers
    -		);
    -		return jobDetailsFuture
    +		return sendRequest(JobsOverviewHeaders.getInstance())
     			.thenApply(
    -				(MultipleJobsDetails multipleJobsDetails) -> {
    -					final Collection<JobDetails> jobDetails = multipleJobsDetails.getJobs();
    -					Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
    -					jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(),
detail.getJobName(), detail.getStatus(), detail.getStartTime())));
    -
    -					return flattenedDetails;
    -			});
    +				(multipleJobsDetails) -> multipleJobsDetails
    +					.getJobs()
    +					.stream()
    +					.map(detail -> new JobStatusMessage(
    +						detail.getJobId(),
    +						detail.getJobName(),
    +						detail.getStatus(),
    +						detail.getStartTime()))
    +					.collect(Collectors.toList()));
     	}
     
     	@Override
     	public T getClusterId() {
     		return clusterId;
     	}
     
    -	private <R, A extends AsynchronouslyCreatedResource<R>> R waitForResource(
    -			final SupplierWithException<CompletableFuture<A>, IOException> resourceFutureSupplier)
    -				throws IOException, InterruptedException, ExecutionException, TimeoutException {
    -		A asynchronouslyCreatedResource;
    -		long attempt = 0;
    -		while (true) {
    -			final CompletableFuture<A> responseFuture = resourceFutureSupplier.get();
    -			asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    -			if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED)
{
    -				break;
    +	/**
    +	 * Polls a {@code AsynchronouslyCreatedResource} until its
    +	 * {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes
    +	 * {@link QueueStatus.Id#COMPLETED COMPLETED}. This method returns a {@code CompletableFuture}
    +	 * which completes with {@link AsynchronouslyCreatedResource#resource()}.
    +	 *
    +	 * @param resourceFutureSupplier The operation which polls for the
    +	 *                               {@code AsynchronouslyCreatedResource}.
    +	 * @param <R>                    The type of the resource.
    +	 * @param <A>                    The type of the {@code AsynchronouslyCreatedResource}.
    +	 * @return A {@code CompletableFuture} delivering the resource.
    +	 */
    +	private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R>
waitForResource(
    --- End diff --
    
    I think this method could be renamed into something like `createResourcePollingFuture`
or so


---

Mime
View raw message