flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7331) Remove Flink's futures from ResourceManager
Date Tue, 01 Aug 2017 12:23:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108806#comment-16108806
] 

ASF GitHub Bot commented on FLINK-7331:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4444#discussion_r130592145
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
    @@ -653,6 +565,130 @@ UUID getLeaderSessionId() {
     	//  Internal methods
     	// ------------------------------------------------------------------------
     
    +	/**
    +	 * Registers a new JobMaster.
    +	 *
    +	 * @param jobMasterGateway to communicate with the registering JobMaster
    +	 * @param jobLeaderId leader id of the JobMaster
    +	 * @param jobId of the job for which the JobMaster is responsible
    +	 * @param jobManagerAddress address of the JobMaster
    +	 * @param jobManagerResourceId ResourceID of the JobMaster
    +	 * @return RegistrationResponse
    +	 */
    +	private RegistrationResponse registerJobMasterInternal(
    +		final JobMasterGateway jobMasterGateway,
    +		UUID jobLeaderId,
    +		JobID jobId,
    +		String jobManagerAddress,
    +		ResourceID jobManagerResourceId) {
    +		if (jobManagerRegistrations.containsKey(jobId)) {
    +			JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
    +
    +			if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
    +				// same registration
    +				log.debug("Job manager {}@{} was already registered.", jobLeaderId, jobManagerAddress);
    +			} else {
    +				// tell old job manager that he is no longer the job leader
    +				disconnectJobManager(
    +					oldJobManagerRegistration.getJobID(),
    +					new Exception("New job leader for job " + jobId + " found."));
    +
    +				JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
    +					jobId,
    +					jobManagerResourceId,
    +					jobLeaderId,
    +					jobMasterGateway);
    +				jobManagerRegistrations.put(jobId, jobManagerRegistration);
    +				jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
    +			}
    +		} else {
    +			// new registration for the job
    +			JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
    +				jobId,
    +				jobManagerResourceId,
    +				jobLeaderId,
    +				jobMasterGateway);
    +			jobManagerRegistrations.put(jobId, jobManagerRegistration);
    +			jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
    +		}
    +
    +		log.info("Registered job manager {}@{} for job {}.", jobLeaderId, jobManagerAddress,
jobId);
    +
    +		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>()
{
    +			@Override
    +			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
    +				// the ResourceManager will always send heartbeat requests to the JobManager
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Void payload) {
    +				jobMasterGateway.heartbeatFromResourceManager(resourceID);
    +			}
    +		});
    +
    +		return new JobMasterRegistrationSuccess(
    +			resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
    +			getLeaderSessionId(),
    +			resourceId);
    +	}
    +
    +	/**
    +	 * Registers a new TaskExecutor.
    +	 *
    +	 * @param taskExecutorGateway to communicate with the registering TaskExecutor
    +	 * @param taskExecutorAddress address of the TaskExecutor
    +	 * @param taskExecutorResourceId ResourceID of the TaskExecutor
    +	 * @param slotReport initial slot report from the TaskExecutor
    +	 * @return RegistrationResponse
    +	 */
    +	private RegistrationResponse registerTaskExecutorInternal(
    +		TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress,
    +		ResourceID taskExecutorResourceId,
    +		SlotReport slotReport) {
    +		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
    +		if (oldRegistration != null) {
    +			// TODO :: suggest old taskExecutor to stop itself
    +			log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
    +
    +			// remove old task manager registration from slot manager
    +			slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
    +		}
    +
    +		final WorkerType newWorker = workerStarted(taskExecutorResourceId);
    +
    +		if(newWorker == null) {
    --- End diff --
    
    Good catch. Will fix it.


> Remove Flink's futures from ResourceManager
> -------------------------------------------
>
>                 Key: FLINK-7331
>                 URL: https://issues.apache.org/jira/browse/FLINK-7331
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>             Fix For: 1.4.0
>
>
> Remove only internally used Flink {{Futures}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message