flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5767: [FLINK-8887] Wait for JobMaster leader election in...
Date Wed, 28 Mar 2018 09:58:39 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5767#discussion_r177684059
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
    @@ -645,6 +637,56 @@ private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void>
     			jobManagerRunnerTerminationFuture));
     	}
     
    +	private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId)
{
    +		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
    +
    +		if (jobManagerRunner == null) {
    +			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
    +		} else {
    +			final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
    +			return leaderGatewayFuture.thenApplyAsync(
    +				(JobMasterGateway jobMasterGateway) -> {
    +					// check whether the retrieved JobMasterGateway belongs still to a running JobMaster
    +					if (jobManagerRunners.containsKey(jobId)) {
    +						return jobMasterGateway;
    +					} else {
    +						throw new CompletionException(new FlinkJobNotFoundException(jobId));
    +					}
    +				},
    +				getMainThreadExecutor());
    +		}
    +	}
    +
    +	private <T> List<T> flattenOptionalCollection(Collection<Optional<T>>
optionalCollection) {
    +		return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    +	}
    +
    +	@Nonnull
    +	private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway,
CompletableFuture<T>> queryFunction) {
    +		final int numberJobsRunning = jobManagerRunners.size();
    +
    +		ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation
= new ArrayList<>(
    +			numberJobsRunning);
    +
    +		for (JobID jobId : jobManagerRunners.keySet()) {
    +			final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
    +
    +			final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture
    +				.thenCompose(queryFunction::apply)
    +				.handle(
    +					(T value, Throwable throwable) -> {
    +						if (throwable != null) {
    --- End diff --
    
    This is equivalent to `.handle((T value, Throwable throwable) -> Optional.ofNullable(value));`


---

Mime
View raw message