flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor
Date Wed, 24 Aug 2016 12:37:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434842#comment-15434842
] 

ASF GitHub Bot commented on FLINK-4449:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2410#discussion_r76047656
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
---
    @@ -94,26 +126,142 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway)
{
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    +	 * Register a {@link org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor} at the
resource manager.
     	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader

    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    -	 *
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse>
registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID)
    +	{
    +		log.info("Received taskExecutor registration with resource id {} from {}", resourceID,
taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>()
{
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(
    +				final TaskExecutorGateway taskExecutorGateway)
    +			{
    +				// decline registration if resourceManager cannot connect to the taskExecutor using
the given address
    +				if (taskExecutorGateway == null) {
    +					log.warn("ResourceManager {} decline taskExecutor registration with resource id
{} from {} because cannot connect to it using given address",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot
connect to taskExecutor using given address");
    +				} else {
    +					// register target taskExecutor to heartbeat manager
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					long heartbeatInterval = heartbeatManager.registerTarget(resourceID, taskExecutorGateway,
taskExecutorAddress);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatInterval);
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +	}
    +
    +	/**
    +	 * notify lost heartbeat with specified taskExecutor
    +	 *
    +	 * @param resourceID identify the taskManager which lost heartbeat with
    +	 */
    +	void notifyLostHeartbeat(final ResourceID resourceID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				TaskExecutorGateway failedTaskManager = taskExecutorGateways.remove(resourceID);
    +				if (failedTaskManager != null) {
    +					heartbeatManager.stopHeartbeatToTaskExecutor(resourceID);
    +					failedTaskManager.markedFailed(leaderSessionID);
    +				}
    +			}
    +		});
    +	}
    +
    +
    +	/**
    +	 * notify slotReport which is sent by taskManager to resourceManager
    +	 *
    +	 * @param slotReport the slot allocation report from taskManager
    +	 */
    +	void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +
    +	}
    +
    +
    +	/**
    +	 * callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(),
newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				heartbeatManager = new ResourceManagerToTaskExecutorHeartbeatManager(ResourceManager.this,
newLeaderSessionID, log);
    --- End diff --
    
    why can't the heartbeat manager always be created and only be activated with the new leader
session ID. Then we would save some object creations.


> Heartbeat Manager between ResourceManager and TaskExecutor
> ----------------------------------------------------------
>
>                 Key: FLINK-4449
>                 URL: https://issues.apache.org/jira/browse/FLINK-4449
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is not reported
in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message