flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
Date Wed, 02 Aug 2017 16:42:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111251#comment-16111251
] 

ASF GitHub Bot commented on FLINK-7334:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130929937
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
    @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
     		if (registeredTaskManagers.containsKey(taskManagerId)) {
     			final RegistrationResponse response = new JMTMRegistrationSuccess(
     				resourceId, libraryCacheManager.getBlobServerPort());
    -			return FlinkCompletableFuture.completed(response);
    +			return CompletableFuture.completedFuture(response);
     		} else {
    -			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
    -				@Override
    -				public TaskExecutorGateway call() throws Exception {
    -					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    -							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
    -				}
    -			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>()
{
    -				@Override
    -				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway,
Throwable throwable) {
    -					if (throwable != null) {
    -						return new RegistrationResponse.Decline(throwable.getMessage());
    -					}
    -
    -					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
    -						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected
" +
    -										"leader session ID {} did not equal the received leader session ID {}.",
    -								taskManagerId, taskManagerRpcAddress,
    -								JobMaster.this.leaderSessionID, leaderId);
    -						return new RegistrationResponse.Decline("Invalid leader session id");
    -					}
    -
    -					slotPoolGateway.registerTaskManager(taskManagerId);
    -					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
    -
    -					// monitor the task manager as heartbeat target
    -					taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>()
{
    -						@Override
    -						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
    -							// the task manager will not request heartbeat, so this method will never be called
currently
    +			return getRpcService()
    +				.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    --- End diff --
    
    Because we were blocking a thread from the `RpcService's` `Executor` without a reason
by calling `get` on the returned future by `RpcService#connect`. 


> Replace Flink's futures by CompletableFuture in RpcGateway
> ----------------------------------------------------------
>
>                 Key: FLINK-7334
>                 URL: https://issues.apache.org/jira/browse/FLINK-7334
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message