flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4738) Port TaskManager logic to TaskExecutor
Date Mon, 10 Oct 2016 08:43:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561670#comment-15561670
] 

ASF GitHub Bot commented on FLINK-4738:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2594#discussion_r82563262
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
    @@ -127,12 +195,423 @@ public void start() {
     		}
     	}
     
    +	/**
    +	 * Called to shut down the TaskManager. The method closes all TaskManager services.
    +	 */
    +	@Override
    +	public void shutDown() {
    +		log.info("Stopping TaskManager {}.", getAddress());
    +
    +		if (resourceManagerConnection.isConnected()) {
    +			try {
    +				resourceManagerConnection.close();
    +			} catch (Exception e) {
    +				log.error("Could not cleanly close the ResourceManager connection.", e);
    +			}
    +		}
    +
    +		try {
    +			ioManager.shutdown();
    +		} catch (Exception e) {
    +			log.error("IOManager did not shut down properly.", e);
    +		}
    +
    +		try {
    +			memoryManager.shutdown();
    +		} catch (Exception e) {
    +			log.error("MemoryManager did not shut down properly.", e);
    +		}
    +
    +		try {
    +			networkEnvironment.shutdown();
    +		} catch (Exception e) {
    +			log.error("Network environment did not shut down properly.", e);
    +		}
    +
    +		try {
    +			fileCache.shutdown();
    +		} catch (Exception e) {
    +			log.error("File cache did not shut down properly.", e);
    +		}
    +
    +		try {
    +			metricRegistry.shutdown();
    +		} catch (Exception e) {
    +			log.error("MetricRegistry did not shut down properly.", e);
    +		}
    +
    +		log.info("Stopped TaskManager {}.", getAddress());
    +	}
    +
    +	// ========================================================================
    +	//  RPC methods
    +	// ========================================================================
    +
    +	// ----------------------------------------------------------------------
    +	// Task lifecycle RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID)
throws TaskSubmissionException {
    +
    +		JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID);
    +
    +		if (jobManagerConnection == null) {
    +			final String message = "Could not submit task because JobManager " + jobManagerID
+
    +				" was not associated.";
    +
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +
    +		TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
    +
    +		if (taskSlot == null) {
    +			final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID()
+ '.';
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +
    +		TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd);
    +
    +		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
    +			jobManagerConnection.getJobManagerGateway(),
    +			tdd.getJobID(),
    +			tdd.getVertexID(),
    +			tdd.getExecutionId(),
    +			taskManagerConfiguration.getTimeout());
    +
    +		TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
    +		CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
    +		LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
    +		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
    +		PartitionStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
    +
    +		Task task = new Task(
    +			tdd,
    +			memoryManager,
    +			ioManager,
    +			networkEnvironment,
    +			broadcastVariableManager,
    +			taskManagerActions,
    +			inputSplitProvider,
    +			checkpointResponder,
    +			libraryCache,
    +			fileCache,
    +			taskManagerRuntimeInfo,
    +			taskMetricGroup,
    +			resultPartitionConsumableNotifier,
    +			partitionStateChecker,
    +			getRpcService().getExecutor());
    +
    +		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
    +
    +		if(taskSlot.add(task)) {
    +			TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot);
    +
    +			taskSlotMappings.put(task.getExecutionId(), taskSlotMapping);
    +			task.startTaskThread();
    +
    +			return Acknowledge.get();
    +		} else {
    +			final String message = "TaskManager already contains a task for id " +
    +				task.getExecutionId() + '.';
    +
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException
{
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			try {
    +				task.cancelExecution();
    +				return Acknowledge.get();
    +			} catch (Throwable t) {
    +				throw new TaskException("Cannot cancel task for execution " + executionAttemptID
+ '.', t);
    +			}
    +		} else {
    +			final String message = "Cannot find task to stop for execution " + executionAttemptID
+ '.';
    +
    +			log.debug(message);
    +			throw new TaskException(message);
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException
{
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			try {
    +				task.stopExecution();
    +				return Acknowledge.get();
    +			} catch (Throwable t) {
    +				throw new TaskException("Cannot stop task for execution " + executionAttemptID +
'.', t);
    +			}
    +		} else {
    +			final String message = "Cannot find task to stop for execution " + executionAttemptID
+ '.';
    +
    +			log.debug(message);
    +			throw new TaskException(message);
    +		}
    +	}
    +
    +	// ----------------------------------------------------------------------
    +	// Partition lifecycle RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo>
partitionInfos) throws PartitionException {
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			for (final PartitionInfo partitionInfo: partitionInfos) {
    +				IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
    +
    +				final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
    +
    +				if (singleInputGate != null) {
    +					// Run asynchronously because it might be blocking
    +					getRpcService().execute(new Runnable() {
    +						@Override
    +						public void run() {
    +							try {
    +								singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
    +							} catch (IOException | InterruptedException e) {
    +								log.error("Could not update input data location for task {}. Trying to fail task.",
task.getTaskInfo().getTaskName(), e);
    +
    +								try {
    +									task.failExternally(e);
    +								} catch (RuntimeException re) {
    +									// TODO: Check whether we need this or make exception in failExtenally checked
    +									log.error("Failed canceling task with execution ID {} after task update failure.",
executionAttemptID, re);
    +								}
    +							}
    +						}
    +					});
    +				} else {
    +					throw new PartitionException("No reader with ID " +
    +						intermediateResultPartitionID + " for task " + executionAttemptID +
    +						" was found.");
    +				}
    +			}
    +
    +			return Acknowledge.get();
    +		} else {
    +			log.debug("Discard update for input partitions of task {}. Task is no longer running.",
executionAttemptID);
    +			return Acknowledge.get();
    +		}
    +	}
    +
    +	@RpcMethod
    +	public void failPartition(ExecutionAttemptID executionAttemptID) {
    +		log.info("Discarding the results produced by task execution {}.", executionAttemptID);
    +
    +		try {
    +			networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
    +		} catch (Throwable t) {
    +			// TODO: Do we still need this catch branch?
    +			onFatalError(t);
    +		}
    +
    +		// TODO: Maybe it's better to return an Acknowledge here to notify the JM about the
success/failure with an Exception
    +	}
    +
    +	// ----------------------------------------------------------------------
    +	// Checkpointing RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId,
long checkpointTimestamp) throws CheckpointException {
    +		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    +
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
    +
    +			return Acknowledge.get();
    +		} else {
    +			final String message = "TaskManager received a checkpoint request for unknown task
" + executionAttemptID + '.';
    +
    +			log.debug(message);
    +			throw new CheckpointException(message);
    +		}
    +	}
    +
    --- End diff --
    
    Good catch. Will fix it.


> Port TaskManager logic to TaskExecutor
> --------------------------------------
>
>                 Key: FLINK-4738
>                 URL: https://issues.apache.org/jira/browse/FLINK-4738
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> Port the basic operations of the {{TaskManager}} to the {{TaskExecutor}}. These operations
include the task lifecycle methods, {{JobManager}} association logic and setup of TaskManager
components.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message