flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #4444: [FLINK-7331] [futures] Replace Flink's Future with...
Date Tue, 01 Aug 2017 12:22:13 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4444#discussion_r130592145
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
    @@ -653,6 +565,130 @@ UUID getLeaderSessionId() {
     	//  Internal methods
     	// ------------------------------------------------------------------------
     
    +	/**
    +	 * Registers a new JobMaster.
    +	 *
    +	 * @param jobMasterGateway to communicate with the registering JobMaster
    +	 * @param jobLeaderId leader id of the JobMaster
    +	 * @param jobId of the job for which the JobMaster is responsible
    +	 * @param jobManagerAddress address of the JobMaster
    +	 * @param jobManagerResourceId ResourceID of the JobMaster
    +	 * @return RegistrationResponse
    +	 */
    +	private RegistrationResponse registerJobMasterInternal(
    +		final JobMasterGateway jobMasterGateway,
    +		UUID jobLeaderId,
    +		JobID jobId,
    +		String jobManagerAddress,
    +		ResourceID jobManagerResourceId) {
    +		if (jobManagerRegistrations.containsKey(jobId)) {
    +			JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
    +
    +			if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
    +				// same registration
    +				log.debug("Job manager {}@{} was already registered.", jobLeaderId, jobManagerAddress);
    +			} else {
    +				// tell old job manager that he is no longer the job leader
    +				disconnectJobManager(
    +					oldJobManagerRegistration.getJobID(),
    +					new Exception("New job leader for job " + jobId + " found."));
    +
    +				JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
    +					jobId,
    +					jobManagerResourceId,
    +					jobLeaderId,
    +					jobMasterGateway);
    +				jobManagerRegistrations.put(jobId, jobManagerRegistration);
    +				jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
    +			}
    +		} else {
    +			// new registration for the job
    +			JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
    +				jobId,
    +				jobManagerResourceId,
    +				jobLeaderId,
    +				jobMasterGateway);
    +			jobManagerRegistrations.put(jobId, jobManagerRegistration);
    +			jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
    +		}
    +
    +		log.info("Registered job manager {}@{} for job {}.", jobLeaderId, jobManagerAddress,
jobId);
    +
    +		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>()
{
    +			@Override
    +			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
    +				// the ResourceManager will always send heartbeat requests to the JobManager
    +			}
    +
    +			@Override
    +			public void requestHeartbeat(ResourceID resourceID, Void payload) {
    +				jobMasterGateway.heartbeatFromResourceManager(resourceID);
    +			}
    +		});
    +
    +		return new JobMasterRegistrationSuccess(
    +			resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
    +			getLeaderSessionId(),
    +			resourceId);
    +	}
    +
    +	/**
    +	 * Registers a new TaskExecutor.
    +	 *
    +	 * @param taskExecutorGateway to communicate with the registering TaskExecutor
    +	 * @param taskExecutorAddress address of the TaskExecutor
    +	 * @param taskExecutorResourceId ResourceID of the TaskExecutor
    +	 * @param slotReport initial slot report from the TaskExecutor
    +	 * @return RegistrationResponse
    +	 */
    +	private RegistrationResponse registerTaskExecutorInternal(
    +		TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress,
    +		ResourceID taskExecutorResourceId,
    +		SlotReport slotReport) {
    +		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
    +		if (oldRegistration != null) {
    +			// TODO :: suggest old taskExecutor to stop itself
    +			log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
    +
    +			// remove old task manager registration from slot manager
    +			slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
    +		}
    +
    +		final WorkerType newWorker = workerStarted(taskExecutorResourceId);
    +
    +		if(newWorker == null) {
    --- End diff --
    
    Good catch. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message