Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,1169 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.MapReduceChildJVM;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ContainerBuilderHelper;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Implementation of TaskAttempt interface.
+ */
+@SuppressWarnings("all")
+public abstract class TaskAttemptImpl implements
+ org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
+ EventHandler<TaskAttemptEvent> {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+
+ protected final Configuration conf;
+ protected final Path jobFile;
+ protected final int partition;
+ protected final EventHandler eventHandler;
+ private final TaskAttemptID attemptId;
+ private final org.apache.hadoop.mapred.JobID oldJobId;
+ private final TaskAttemptListener taskAttemptListener;
+ private final OutputCommitter committer;
+ private final Resource resourceCapability;
+ private final String[] dataLocalHosts;
+ private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+ private final Lock readLock;
+ private final Lock writeLock;
+ private Collection<Token<? extends TokenIdentifier>> fsTokens;
+ private Token<JobTokenIdentifier> jobToken;
+
+ private long launchTime;
+ private long finishTime;
+
+ private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
+ new CleanupContainerTransition();
+ private static final StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachineFactory
+ = new StateMachineFactory
+ <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptState.NEW)
+
+ // Transitions from the NEW state.
+ .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+ TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition())
+ .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
+ TaskAttemptEventType.TA_KILL, new KilledTransition())
+
+ // Transitions from the UNASSIGNED state.
+ .addTransition(TaskAttemptState.UNASSIGNED,
+ TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+ new ContainerAssignedTransition())
+ .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
+ TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
+ TaskAttemptState.KILLED, true))
+
+ // Transitions from the ASSIGNED state.
+ .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ new LaunchedContainerTransition())
+ .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+ new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
+ .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.KILLED,
+ TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
+ TaskAttemptState.KILLED, false))
+
+ // Transitions from RUNNING state.
+ .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+ TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
+ .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ new DiagnosticInformationUpdater())
+ // If no commit is required, task directly goes to success
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+ // If commit is required, task goes through commit pending state.
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
+ // Failure handling while RUNNING
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+ //for handling container exit without sending the done or fail msg
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ CLEANUP_CONTAINER_TRANSITION)
+ // Timeout handling while RUNNING
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+ // Kill handling
+ .addTransition(TaskAttemptState.RUNNING,
+ TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ CLEANUP_CONTAINER_TRANSITION)
+
+ // Transitions from COMMIT_PENDING state
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
+ new StatusUpdater())
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ new DiagnosticInformationUpdater())
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptState.COMMIT_PENDING,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+
+ // Transitions from SUCCESS_CONTAINER_CLEANUP state
+ // kill and cleanup the container
+ .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ new SucceededTransition())
+ // Ignore-able events
+ .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_TIMED_OUT,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED))
+
+ // Transitions from FAIL_CONTAINER_CLEANUP state.
+ .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptState.FAIL_TASK_CLEANUP,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
+ // Ignore-able events
+ .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_TIMED_OUT))
+
+ // Transitions from KILL_CONTAINER_CLEANUP
+ .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ TaskAttemptState.KILL_TASK_CLEANUP,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
+ // Ignore-able events
+ .addTransition(
+ TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ TaskAttemptState.KILL_CONTAINER_CLEANUP,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_TIMED_OUT))
+
+ // Transitions from FAIL_TASK_CLEANUP
+ // run the task cleanup
+ .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
+ TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
+ new FailedTransition())
+ // Ignore-able events
+ .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
+ TaskAttemptState.FAIL_TASK_CLEANUP,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG))
+
+ // Transitions from KILL_TASK_CLEANUP
+ .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
+ TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
+ new KilledTransition())
+ // Ignore-able events
+ .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
+ TaskAttemptState.KILL_TASK_CLEANUP,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG))
+
+ // Transitions from SUCCEEDED
+ .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
+ TaskAttemptState.FAILED,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
+ new TooManyFetchFailureTransition())
+ // Ignore-able events for SUCCEEDED state
+ .addTransition(TaskAttemptState.SUCCEEDED,
+ TaskAttemptState.SUCCEEDED,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED))
+
+ // Ignore-able events for FAILED state
+ .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG))
+
+ // Ignore-able events for KILLED state
+ .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_ASSIGNED,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_FAILMSG))
+
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine
+ <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachine;
+
+ private ContainerID containerID;
+ private String containerMgrAddress;
+ private WrappedJvmID jvmID;
+ private ContainerToken containerToken;
+
+ //this takes good amount of memory ~ 30KB. Instantiate it lazily
+ //and make it null once task is launched.
+ private org.apache.hadoop.mapred.Task remoteTask;
+
+ //this is the last status reported by the REMOTE running attempt
+ private TaskAttemptStatus reportedStatus;
+
+ public TaskAttemptImpl(TaskID taskId, int i, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
+ Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
+ oldJobId = TypeConverter.fromYarn(taskId.jobID);
+ this.conf = conf;
+ attemptId = new TaskAttemptID();
+ attemptId.taskID = taskId;
+ attemptId.id = i;
+ this.taskAttemptListener = taskAttemptListener;
+
+ // Initialize reportedStatus
+ reportedStatus = new TaskAttemptStatus();
+ initTaskAttemptStatus(reportedStatus);
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ readLock = readWriteLock.readLock();
+ writeLock = readWriteLock.writeLock();
+
+ this.fsTokens = fsTokens;
+ this.jobToken = jobToken;
+ this.eventHandler = eventHandler;
+ this.committer = committer;
+ this.jobFile = jobFile;
+ this.partition = partition;
+
+ //TODO:create the resource reqt for this Task attempt
+ this.resourceCapability = new Resource();
+ this.resourceCapability.memory = getMemoryRequired(conf, taskId.taskType);
+ this.dataLocalHosts = dataLocalHosts;
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ private int getMemoryRequired(Configuration conf, TaskType taskType) {
+ int memory = 1024;
+ if (taskType == TaskType.MAP) {
+ memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
+ } else if (taskType == TaskType.REDUCE) {
+ memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
+ }
+
+ return 1024;
+ }
+
+ private static LocalResource getLocalResource(FileContext fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ LocalResource resource = new LocalResource();
+ resource.resource = AvroUtil.getYarnUrlFromPath(fstat.getPath());
+ resource.type = type;
+ resource.state = visibility;
+ resource.size = fstat.getLen();
+ resource.timestamp = fstat.getModificationTime();
+ return resource;
+ }
+
+ private ContainerLaunchContext getContainer() {
+
+ ContainerLaunchContext container = new ContainerLaunchContext();
+ container.serviceData = new HashMap<CharSequence, ByteBuffer>();
+ container.resources = new HashMap<CharSequence, LocalResource>();
+ container.env = new HashMap<CharSequence, CharSequence>();
+
+
+ try {
+ FileContext remoteFS = FileContext.getFileContext(conf);
+
+ Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
+ remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
+ URL jobConfFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(localizedJobConf);
+ LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
+
+ Path jobJar = remoteFS.makeQualified(new Path(remoteTask.getConf().get(MRJobConfig.JAR)));
+ URL jobJarFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(jobJar);
+ container.resources.put(YARNApplicationConstants.JOB_JAR,
+ getLocalResource(remoteFS, jobJar,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ LOG.info("The job-jar file on the remote FS is " + jobJarFileOnRemoteFS);
+
+ Path jobSubmitDir =
+ new Path(conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
+ oldJobId.toString());
+ Path jobTokenFile =
+ remoteFS.makeQualified(new Path(jobSubmitDir,
+ YarnConfiguration.APPLICATION_TOKENS_FILE));
+ URL applicationTokenFileOnRemoteFS =
+ AvroUtil.getYarnUrlFromPath(jobTokenFile);
+ // TODO: Looks like this is not needed. Revisit during localization
+ // cleanup.
+ //container.resources_todo.put(YarnConfiguration.APPLICATION_TOKENS_FILE,
+ // getLocalResource(remoteFS, jobTokenFile,
+ // LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ container.resources.put(YARNApplicationConstants.JOB_CONF_FILE,
+ getLocalResource(remoteFS,
+ new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ LOG.info("The application token file on the remote FS is "
+ + applicationTokenFileOnRemoteFS);
+
+ // Setup DistributedCache
+ setupDistributedCache(conf, container);
+
+ // Setup up tokens
+ Credentials taskCredentials = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Add file-system tokens
+ for (Token<? extends TokenIdentifier> token : fsTokens) {
+ LOG.info("Putting fs-token for NM use for launching container : "
+ + token.getIdentifier().toString());
+ taskCredentials.addToken(token.getService(), token);
+ }
+ }
+
+ // LocalStorageToken is needed irrespective of whether security is enabled
+ // or not.
+ TokenCache.setJobToken(jobToken, taskCredentials);
+
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ LOG.info("Size of containertokens_dob is "
+ + taskCredentials.numberOfTokens());
+ taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+ container.containerTokens =
+ ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+ containerTokens_dob.getLength());
+
+ // Add shuffle token
+ LOG.info("Putting shuffle token in serviceData");
+ DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+ jobToken.write(jobToken_dob);
+ // TODO: should depend on ShuffleHandler
+ container.serviceData.put("mapreduce.shuffle",
+ ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()));
+
+ MRApps.setInitialClasspath(container.env);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ container.id = containerID;
+ container.user = conf.get(MRJobConfig.USER_NAME); // TODO: Fix
+
+ File workDir = new File(ContainerBuilderHelper.getWorkDir());
+ String logDir = new File(workDir, "logs").toString();
+ String childTmpDir = new File(workDir, "tmp").toString();
+ String javaHome = "${JAVA_HOME}";
+ String nmLdLibraryPath =
+ ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH>");
+ List<String> classPaths = new ArrayList<String>();
+
+ String localizedApplicationTokensFile =
+ new File(workDir, YarnConfiguration.APPLICATION_TOKENS_FILE)
+ .toString();
+ classPaths.add(YARNApplicationConstants.JOB_JAR);
+ classPaths.add(YARNApplicationConstants.YARN_MAPREDUCE_APP_JAR_PATH);
+ classPaths.add(workDir.toString()); // TODO
+
+ // Construct the actual Container
+ container.command =
+ MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
+ remoteTask, javaHome, workDir.toString(), logDir,
+ childTmpDir, jvmID);
+
+ MapReduceChildJVM.setVMEnv(container.env, classPaths, workDir.toString(),
+ nmLdLibraryPath, remoteTask, localizedApplicationTokensFile);
+
+ // Construct the actual Container
+ container.id = containerID;
+ container.user = conf.get(MRJobConfig.USER_NAME);
+ container.resource = resourceCapability;
+ return container;
+ }
+
+ private void setupDistributedCache(Configuration conf,
+ ContainerLaunchContext container) throws IOException {
+
+ // Cache archives
+ parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE,
+ DistributedCache.getCacheArchives(conf),
+ DistributedCache.getArchiveTimestamps(conf),
+ getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
+ DistributedCache.getArchiveVisibilities(conf),
+ DistributedCache.getArchiveClassPaths(conf));
+
+ // Cache files
+ parseDistributedCacheArtifacts(container, LocalResourceType.FILE,
+ DistributedCache.getCacheFiles(conf),
+ DistributedCache.getFileTimestamps(conf),
+ getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+ DistributedCache.getFileVisibilities(conf),
+ DistributedCache.getFileClassPaths(conf));
+ }
+
+ // TODO - Move this to MR!
+ // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
+ // long[], boolean[], Path[], FileType)
+ private static void parseDistributedCacheArtifacts(
+ ContainerLaunchContext container, LocalResourceType type,
+ URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
+ Path[] classpaths) throws IOException {
+
+ if (uris != null) {
+ // Sanity check
+ if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+ (uris.length != visibilities.length)) {
+ throw new IllegalArgumentException("Invalid specification for " +
+ "distributed-cache artifacts of type " + type + " :" +
+ " #uris=" + uris.length +
+ " #timestamps=" + timestamps.length +
+ " #visibilities=" + visibilities.length
+ );
+ }
+
+ Map<String, Path> classPaths = new HashMap<String, Path>();
+ if (classpaths != null) {
+ for (Path p : classpaths) {
+ classPaths.put(p.toUri().getPath().toString(), p);
+ }
+ }
+ for (int i = 0; i < uris.length; ++i) {
+ URI u = uris[i];
+ Path p = new Path(u.toString());
+ // Add URI fragment or just the filename
+ Path name = new Path((null == u.getFragment())
+ ? p.getName()
+ : u.getFragment());
+ if (name.isAbsolute()) {
+ throw new IllegalArgumentException("Resource name must be relative");
+ }
+ container.resources.put(
+ name.toUri().getPath(),
+ BuilderUtils.newLocalResource(
+ uris[i], type,
+ visibilities[i]
+ ? LocalResourceVisibility.PUBLIC
+ : LocalResourceVisibility.PRIVATE,
+ sizes[i], timestamps[i])
+ );
+ if (classPaths.containsKey(u.getPath())) {
+ addCacheArtifactToClassPath(container, name.toUri().getPath());
+ }
+ }
+ }
+ }
+
+ private static final String CLASSPATH = "CLASSPATH";
+ private static void addCacheArtifactToClassPath(
+ ContainerLaunchContext container, String fileName) {
+ CharSequence classpath = container.env.get(CLASSPATH);
+ if (classpath == null) {
+ classpath = fileName;
+ } else {
+ classpath = classpath + ":" + fileName;
+ }
+ container.env.put(CLASSPATH, classpath);
+ }
+
+ // TODO - Move this to MR!
+ private static long[] getFileSizes(Configuration conf, String key) {
+ String[] strs = conf.getStrings(key);
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ @Override
+ public ContainerID getAssignedContainerID() {
+ readLock.lock();
+ try {
+ return containerID;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLaunchTime() {
+ readLock.lock();
+ try {
+ return launchTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getFinishTime() {
+ readLock.lock();
+ try {
+ return finishTime;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**If container Assigned then return container mgr address, otherwise null.
+ */
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ readLock.lock();
+ try {
+ return containerMgrAddress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
+
+ protected abstract int getPriority();
+
+ @Override
+ public TaskAttemptID getID() {
+ return attemptId;
+ }
+
+ @Override
+ public boolean isFinished() {
+ readLock.lock();
+ try {
+ // TODO: Use stateMachine level method?
+ return (getState() == TaskAttemptState.SUCCEEDED ||
+ getState() == TaskAttemptState.FAILED ||
+ getState() == TaskAttemptState.KILLED);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttemptReport getReport() {
+ TaskAttemptReport result = new TaskAttemptReport();
+ readLock.lock();
+ try {
+ result.id = attemptId;
+ //take the LOCAL state of attempt
+ //DO NOT take from reportedStatus
+ result.state = getState();
+ result.progress = reportedStatus.progress;
+ result.startTime = launchTime;
+ result.finishTime = finishTime;
+ result.diagnosticInfo = reportedStatus.diagnosticInfo;
+ result.phase = reportedStatus.phase;
+ result.stateString = reportedStatus.stateString;
+ result.counters = getCounters();
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public List<CharSequence> getDiagnostics() {
+ List<CharSequence> result = new ArrayList<CharSequence>();
+ readLock.lock();
+ try {
+ result.addAll(diagnostics);
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Counters getCounters() {
+ readLock.lock();
+ try {
+ Counters counters = reportedStatus.counters;
+ if (counters == null) {
+ counters = new Counters();
+ counters.groups = new HashMap<CharSequence, CounterGroup>();
+ }
+ return counters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ readLock.lock();
+ try {
+ return reportedStatus.progress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ LOG.info("Processing " + event.getTaskAttemptID() +
+ " of type " + event.getType());
+ writeLock.lock();
+ try {
+ final TaskAttemptState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new JobDiagnosticsUpdateEvent(
+ this.attemptId.taskID.jobID, "Invalid event " + event.getType() +
+ " on TaskAttempt " + this.attemptId));
+ eventHandler.handle(new JobEvent(this.attemptId.taskID.jobID,
+ JobEventType.INTERNAL_ERROR));
+ }
+ if (oldState != getState()) {
+ LOG.info(attemptId + " TaskAttempt Transitioned from "
+ + oldState + " to "
+ + getState());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ //always called in write lock
+ private void setFinishTime() {
+ //set the finish time only if launch time is set
+ if (launchTime != 0) {
+ finishTime = System.currentTimeMillis();
+ }
+ }
+
+ private static String[] racks = new String[] {NetworkTopology.DEFAULT_RACK};
+ private static class RequestContainerTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ // Tell any speculator that we're requesting a container
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent(taskAttempt.getID().taskID, +1));
+ //request for container
+ taskAttempt.eventHandler.handle(
+ new ContainerRequestEvent(taskAttempt.attemptId,
+ taskAttempt.resourceCapability,
+ taskAttempt.getPriority(), taskAttempt.dataLocalHosts, racks));
+ }
+ }
+
+ private static class ContainerAssignedTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(final TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptContainerAssignedEvent cEvent =
+ (TaskAttemptContainerAssignedEvent) event;
+ taskAttempt.containerID = cEvent.getContainerID();
+ taskAttempt.containerMgrAddress = cEvent.getContainerManagerAddress();
+ taskAttempt.containerToken = cEvent.getContainerToken();
+ taskAttempt.remoteTask = taskAttempt.createRemoteTask();
+ taskAttempt.jvmID = new WrappedJvmID(
+ taskAttempt.remoteTask.getTaskID().getJobID(),
+ taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.id);
+
+ //launch the container
+ //create the container object to be launched for a given Task attempt
+ taskAttempt.eventHandler.handle(
+ new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
+ taskAttempt.containerID,
+ taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
+ @Override
+ public ContainerLaunchContext getContainer() {
+ return taskAttempt.getContainer();
+ }
+ @Override
+ public Task getRemoteTask() {
+ return taskAttempt.remoteTask;
+ }
+ });
+
+ // send event to speculator that our container needs are satisfied
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+ }
+ }
+
+ private static class DeallocateContainerTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ private final TaskAttemptState finalState;
+ private final boolean withdrawsContainerRequest;
+ DeallocateContainerTransition
+ (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
+ this.finalState = finalState;
+ this.withdrawsContainerRequest = withdrawsContainerRequest;
+ }
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ //set the finish time
+ taskAttempt.setFinishTime();
+ //send the deallocate event to ContainerAllocator
+ taskAttempt.eventHandler.handle(
+ new ContainerAllocatorEvent(taskAttempt.attemptId,
+ ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
+
+ // send event to speculator that we withdraw our container needs, if
+ // we're transitioning out of UNASSIGNED
+ if (withdrawsContainerRequest) {
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+ }
+
+ switch(finalState) {
+ case FAILED:
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_FAILED));
+ break;
+ case KILLED:
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+ break;
+ }
+ }
+ }
+
+ private static class LaunchedContainerTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ //set the launch time
+ taskAttempt.launchTime = System.currentTimeMillis();
+ // register it to TaskAttemptListener so that it start listening
+ // for it
+ taskAttempt.taskAttemptListener.register(
+ taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
+ TaskAttemptStartedEvent tase =
+ new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ taskAttempt.launchTime,
+ "tracker", 0);
+ taskAttempt.eventHandler.handle
+ (new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tase));
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent
+ (taskAttempt.attemptId, true, System.currentTimeMillis()));
+ //make remoteTask reference as null as it is no more needed
+ //and free up the memory
+ taskAttempt.remoteTask = null;
+
+ //tell the Task that attempt has started
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ }
+ }
+
+ private static class CommitPendingTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ }
+ }
+
+ private static class TaskCleanupTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(taskAttempt.conf,
+ TypeConverter.fromYarn(taskAttempt.attemptId));
+ taskAttempt.eventHandler.handle(new TaskCleanupEvent(
+ taskAttempt.attemptId,
+ taskAttempt.committer,
+ taskContext));
+ }
+ }
+
+ private static class SucceededTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ //set the finish time
+ taskAttempt.setFinishTime();
+ String taskType =
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType).toString();
+ LOG.info("In TaskAttemptImpl taskType: " + taskType);
+ if (taskType.equals("MAP")) {
+ MapAttemptFinishedEvent mfe =
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.SUCCEEDED.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.finishTime, "hostname",
+ TaskAttemptState.SUCCEEDED.toString(),
+ TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+ } else {
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.SUCCEEDED.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.finishTime,
+ taskAttempt.finishTime, "hostname",
+ TaskAttemptState.SUCCEEDED.toString(),
+ TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+ }
+ /*
+ TaskAttemptFinishedEvent tfe =
+ new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.SUCCEEDED.toString(),
+ taskAttempt.reportedStatus.finishTime, "hostname",
+ TaskAttemptState.SUCCEEDED.toString(),
+ TypeConverter.fromYarn(taskAttempt.getCounters()));
+ taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tfe));
+ */
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ }
+ }
+
+ private static class FailedTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ //set the finish time
+ taskAttempt.setFinishTime();
+ TaskAttemptUnsuccessfulCompletionEvent ta =
+ new TaskAttemptUnsuccessfulCompletionEvent(
+ TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.FAILED.toString(),
+ taskAttempt.finishTime,
+ "hostname",
+ taskAttempt.reportedStatus.diagnosticInfo.toString());
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, ta));
+ if (taskAttempt.attemptId.taskID.taskType == TaskType.MAP) {
+ MapAttemptFinishedEvent mfe =
+ new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.FAILED.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.finishTime, "hostname",
+ TaskAttemptState.FAILED.toString(),
+ TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+ } else {
+ ReduceAttemptFinishedEvent rfe =
+ new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.FAILED.toString(),
+ taskAttempt.finishTime,
+ taskAttempt.finishTime,
+ taskAttempt.finishTime, "hostname",
+ TaskAttemptState.FAILED.toString(),
+ TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+ }
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_FAILED));
+ }
+ }
+
+ private static class TooManyFetchFailureTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+ //add to diagnostic
+ taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
+ //set the finish time
+ taskAttempt.setFinishTime();
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+ }
+ }
+
+ private static class KilledTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ //set the finish time
+ taskAttempt.setFinishTime();
+ TaskAttemptUnsuccessfulCompletionEvent tke =
+ new TaskAttemptUnsuccessfulCompletionEvent(
+ TypeConverter.fromYarn(taskAttempt.attemptId),
+ TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TaskAttemptState.KILLED.toString(),
+ taskAttempt.finishTime,
+ TaskAttemptState.KILLED.toString(),
+ taskAttempt.reportedStatus.diagnosticInfo.toString());
+ taskAttempt.eventHandler.handle(
+ new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tke));
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+ }
+ }
+
+ private static class CleanupContainerTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ // unregister it to TaskAttemptListener so that it stops listening
+ // for it
+ taskAttempt.taskAttemptListener.unregister(
+ taskAttempt.attemptId, taskAttempt.jvmID);
+ //send the cleanup event to containerLauncher
+ taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+ taskAttempt.attemptId,
+ taskAttempt.containerID, taskAttempt.containerMgrAddress,
+ taskAttempt.containerToken,
+ ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+ }
+ }
+
+ private void addDiagnosticInfo(CharSequence diag) {
+ if (diag != null && !diag.equals("")) {
+ diagnostics.add(diag);
+ }
+ }
+
+ private static class StatusUpdater
+ implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ // Status update calls don't really change the state of the attempt.
+ TaskAttemptStatus newReportedStatus =
+ ((TaskAttemptStatusUpdateEvent) event)
+ .getReportedTaskAttemptStatus();
+ // Now switch the information in the reportedStatus
+ taskAttempt.reportedStatus = newReportedStatus;
+
+ // send event to speculator about the reported status
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent
+ (taskAttempt.reportedStatus, System.currentTimeMillis()));
+
+ //add to diagnostic
+ taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
+
+ //if fetch failures are present, send the fetch failure event to job
+ //this only will happen in reduce attempt type
+ if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
+ taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
+ taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
+ taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
+ }
+ }
+ }
+
+ private static class DiagnosticInformationUpdater
+ implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptDiagnosticsUpdateEvent diagEvent =
+ (TaskAttemptDiagnosticsUpdateEvent) event;
+ LOG.info("Diagnostics report from " + taskAttempt.attemptId + ": "
+ + diagEvent.getDiagnosticInfo());
+ taskAttempt.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
+ }
+ }
+
+ private void initTaskAttemptStatus(TaskAttemptStatus result) {
+ result.progress = new Float(0);
+ result.diagnosticInfo = new String("");
+ result.phase = Phase.STARTING;
+ result.stateString = new String("NEW");
+ Counters counters = new Counters();
+ counters.groups = new HashMap<CharSequence, CounterGroup>();
+ result.counters = counters;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,743 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobMapTaskRescheduledEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Implementation of Task interface.
+ */
+public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
+
+ private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+
+ protected final Configuration conf;
+ protected final Path jobFile;
+ protected final OutputCommitter committer;
+ protected final int partition;
+ protected final TaskAttemptListener taskAttemptListener;
+ protected final EventHandler eventHandler;
+ private final TaskID taskId;
+ private Map<TaskAttemptID, TaskAttempt> attempts;
+ private final int maxAttempts;
+ private final Lock readLock;
+ private final Lock writeLock;
+ protected Collection<Token<? extends TokenIdentifier>> fsTokens;
+ protected Token<JobTokenIdentifier> jobToken;
+
+ // counts the number of attempts that are either running or in a state where
+ // they will come to be running when they get a Container
+ private int numberUncompletedAttempts = 0;
+
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ KILL_TRANSITION = new KillTransition();
+
+ private static final StateMachineFactory
+ <TaskImpl, TaskState, TaskEventType, TaskEvent>
+ stateMachineFactory
+ = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
+ (TaskState.NEW)
+
+ // define the state machine of Task
+
+ // Transitions from NEW state
+ .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+ TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+ .addTransition(TaskState.NEW, TaskState.KILLED,
+ TaskEventType.T_KILL, new KillNewTransition())
+
+ // Transitions from SCHEDULED state
+ //when the first attempt is launched, the task state is set to RUNNING
+ .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED)
+ .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+ TaskEventType.T_KILL, KILL_TRANSITION)
+ .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED,
+ TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
+ .addTransition(TaskState.SCHEDULED,
+ EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedTransition())
+
+ // Transitions from RUNNING state
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ new AttemptCommitPendingTransition())
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
+ .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ new AttemptSucceededTransition())
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_KILLED,
+ ATTEMPT_KILLED_TRANSITION)
+ .addTransition(TaskState.RUNNING,
+ EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedTransition())
+ .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+ TaskEventType.T_KILL, KILL_TRANSITION)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(TaskState.KILL_WAIT,
+ EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+ TaskEventType.T_ATTEMPT_KILLED,
+ new KillWaitAttemptKilledTransition())
+ // Ignore-able transitions.
+ .addTransition(
+ TaskState.KILL_WAIT,
+ TaskState.KILL_WAIT,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ TaskEventType.T_ATTEMPT_FAILED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // Transitions from SUCCEEDED state
+ .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+ EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+ // Ignore-able transitions.
+ .addTransition(
+ TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_LAUNCHED))
+
+ // Transitions from FAILED state
+ .addTransition(TaskState.FAILED, TaskState.FAILED,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // Transitions from KILLED state
+ .addTransition(TaskState.KILLED, TaskState.KILLED,
+ EnumSet.of(TaskEventType.T_KILL,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine<TaskState, TaskEventType, TaskEvent>
+ stateMachine;
+
+ protected int nextAttemptNumber;
+
+ //should be set to one which comes first
+ //saying COMMIT_PENDING
+ private TaskAttemptID commitAttempt;
+
+ private TaskAttemptID successfulAttempt;
+
+ private int failedAttempts;
+ private int finishedAttempts;//finish are total of success, failed and killed
+
+ @Override
+ public TaskState getState() {
+ return stateMachine.getCurrentState();
+ }
+
+ public TaskImpl(JobID jobId, TaskType taskType, int partition,
+ EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
+ TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
+ this.conf = conf;
+ this.jobFile = remoteJobConfFile;
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ readLock = readWriteLock.readLock();
+ writeLock = readWriteLock.writeLock();
+ this.attempts = Collections.emptyMap();
+ // This overridable method call is okay in a constructor because we
+ // have a convention that none of the overrides depends on any
+ // fields that need initialization.
+ maxAttempts = getMaxAttempts();
+ taskId = new TaskID();
+ taskId.jobID = jobId;
+ taskId.id = partition;
+ taskId.taskType = taskType;
+ this.partition = partition;
+ this.taskAttemptListener = taskAttemptListener;
+ this.eventHandler = eventHandler;
+ this.committer = committer;
+ this.fsTokens = fsTokens;
+ this.jobToken = jobToken;
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+ readLock.lock();
+
+ try {
+ if (attempts.size() <= 1) {
+ return attempts;
+ }
+
+ Map<TaskAttemptID, TaskAttempt> result
+ = new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+ result.putAll(attempts);
+
+ return result;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+ readLock.lock();
+ try {
+ return attempts.get(attemptID);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskID getID() {
+ return taskId;
+ }
+
+ @Override
+ public boolean isFinished() {
+ readLock.lock();
+ try {
+ // TODO: Use stateMachine level method?
+ return (getState() == TaskState.SUCCEEDED ||
+ getState() == TaskState.FAILED ||
+ getState() == TaskState.KILLED);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public TaskReport getReport() {
+ TaskReport report = new TaskReport();
+ readLock.lock();
+ try {
+ report.id = taskId;
+ report.startTime = getLaunchTime();
+ report.finishTime = getFinishTime();
+ report.state = getState();
+ report.progress = getProgress();
+ report.counters = getCounters();
+ report.runningAttempts = new ArrayList<TaskAttemptID>();
+ report.runningAttempts.addAll(attempts.keySet());
+ report.successfulAttempt = successfulAttempt;
+
+ report.diagnostics = new ArrayList<CharSequence>();
+ for (TaskAttempt att : attempts.values()) {
+ String prefix = "AttemptID:" + att.getID() + " Info:";
+ for (CharSequence cs : att.getDiagnostics()) {
+ report.diagnostics.add(prefix + cs);
+ }
+ }
+ return report;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public Counters getCounters() {
+ Counters counters = null;
+ readLock.lock();
+ try {
+ TaskAttempt bestAttempt = selectBestAttempt();
+ if (bestAttempt != null) {
+ counters = bestAttempt.getCounters();
+ } else {
+ counters = new Counters();
+ counters.groups = new HashMap<CharSequence, CounterGroup>();
+ }
+ return counters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ readLock.lock();
+ try {
+ TaskAttempt bestAttempt = selectBestAttempt();
+ if (bestAttempt == null) {
+ return 0;
+ }
+ return bestAttempt.getProgress();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ //this is always called in read/write lock
+ private long getLaunchTime() {
+ long launchTime = 0;
+ for (TaskAttempt at : attempts.values()) {
+ //select the least launch time of all attempts
+ if (launchTime == 0 || launchTime > at.getLaunchTime()) {
+ launchTime = at.getLaunchTime();
+ }
+ }
+ return launchTime;
+ }
+
+ //this is always called in read/write lock
+ private long getFinishTime() {
+ if (!isFinished()) {
+ return 0;
+ }
+ long finishTime = 0;
+ for (TaskAttempt at : attempts.values()) {
+ //select the max finish time of all attempts
+ if (finishTime < at.getFinishTime()) {
+ finishTime = at.getFinishTime();
+ }
+ }
+ return finishTime;
+ }
+
+ //select the nextAttemptNumber with best progress
+ // always called inside the Read Lock
+ private TaskAttempt selectBestAttempt() {
+ float progress = 0f;
+ TaskAttempt result = null;
+ for (TaskAttempt at : attempts.values()) {
+ if (result == null) {
+ result = at; //The first time around
+ }
+ //TODO: consider the nextAttemptNumber only if it is not failed/killed ?
+ // calculate the best progress
+ if (at.getProgress() > progress) {
+ result = at;
+ progress = at.getProgress();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean canCommit(TaskAttemptID taskAttemptID) {
+ readLock.lock();
+ boolean canCommit = false;
+ try {
+ if (commitAttempt != null) {
+ canCommit = taskAttemptID.equals(commitAttempt);
+ LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return canCommit;
+ }
+
+ protected abstract TaskAttemptImpl createAttempt();
+
+ // No override of this method may require that the subclass be initialized.
+ protected abstract int getMaxAttempts();
+
+ protected TaskAttempt getSuccessfulAttempt() {
+ readLock.lock();
+ try {
+ if (null == successfulAttempt) {
+ return null;
+ }
+ return attempts.get(successfulAttempt);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ // This is always called in the Write Lock
+ private void addAndScheduleAttempt() {
+ TaskAttempt attempt = createAttempt();
+ LOG.info("Created attempt " + attempt.getID());
+ switch (attempts.size()) {
+ case 0:
+ attempts = Collections.singletonMap(attempt.getID(), attempt);
+ break;
+
+ case 1:
+ Map newAttempts
+ = new LinkedHashMap<TaskAttemptID, TaskAttempt>(maxAttempts);
+ newAttempts.putAll(attempts);
+ attempts = newAttempts;
+ attempts.put(attempt.getID(), attempt);
+ break;
+
+ default:
+ attempts.put(attempt.getID(), attempt);
+ break;
+ }
+ ++nextAttemptNumber;
+ ++numberUncompletedAttempts;
+ //schedule the nextAttemptNumber
+ eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_SCHEDULE));
+ }
+
+ @Override
+ public void handle(TaskEvent event) {
+ LOG.info("Processing " + event.getTaskID() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ TaskState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ internalError(event.getType());
+ }
+ if (oldState != getState()) {
+ LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+ + getState());
+ }
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void internalError(TaskEventType type) {
+ eventHandler.handle(new JobDiagnosticsUpdateEvent(
+ this.taskId.jobID, "Invalid event " + type +
+ " on Task " + this.taskId));
+ eventHandler.handle(new JobEvent(this.taskId.jobID,
+ JobEventType.INTERNAL_ERROR));
+ }
+
+ // always called inside a transition, in turn inside the Write Lock
+ private void handleTaskAttemptCompletion(TaskAttemptID attemptId,
+ TaskAttemptCompletionEventStatus status) {
+ finishedAttempts++;
+ TaskAttempt attempt = attempts.get(attemptId);
+ //raise the completion event only if the container is assigned
+ // to nextAttemptNumber
+ if (attempt.getAssignedContainerMgrAddress() != null) {
+ TaskAttemptCompletionEvent tce = new TaskAttemptCompletionEvent();
+ tce.eventId = -1;
+ //TODO: XXXXXX hardcoded port
+ tce.mapOutputServerAddress =
+ "http://" + attempt.getAssignedContainerMgrAddress().split(":")[0]
+ + ":8080";
+ tce.status = status;
+ tce.attemptId = attempt.getID();
+ tce.attemptRunTime = 0; // TODO: set the exact run time of the task.
+
+ //raise the event to job so that it adds the completion event to its
+ //data structures
+ eventHandler.handle(new JobTaskAttemptCompletedEvent(tce));
+ }
+ }
+
+ private static class InitialScheduleTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskStartedEvent tse = new TaskStartedEvent(TypeConverter
+ .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
+ .fromYarn(task.taskId.taskType), TaskState.RUNNING.toString());
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tse));
+
+ task.addAndScheduleAttempt();
+ }
+ }
+
+ // Used when creating a new attempt while one is already running.
+ // Currently we do this for speculation. In the future we may do this
+ // for tasks that failed in a way that might indicate application code
+ // problems, so we can take later failures in parallel and flush the
+ // job quickly when this happens.
+ private static class RedundantScheduleTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ LOG.info("Scheduling a redundant attempt for task " + task.taskId);
+ task.addAndScheduleAttempt();
+ }
+ }
+
+ private static class AttemptCommitPendingTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
+ // The nextAttemptNumber is commit pending, decide on set the commitAttempt
+ TaskAttemptID attemptID = ev.getTaskAttemptID();
+ if (task.commitAttempt == null) {
+ // TODO: validate attemptID
+ task.commitAttempt = attemptID;
+ LOG.info(attemptID + " given a go for committing the task output.");
+ } else {
+ // Don't think this can be a pluggable decision, so simply raise an
+ // event for the TaskAttempt to delete its output.
+ LOG.info(task.commitAttempt
+ + " already given a go for committing the task output, so killing "
+ + attemptID);
+ task.eventHandler.handle(new TaskAttemptEvent(
+ attemptID, TaskAttemptEventType.TA_KILL));
+ }
+ }
+ }
+
+ private static class AttemptSucceededTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ TaskAttemptCompletionEventStatus.SUCCEEDED);
+ --task.numberUncompletedAttempts;
+ task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
+ task.eventHandler.handle(new JobTaskEvent(
+ task.taskId, TaskState.SUCCEEDED));
+ LOG.info("Task succeeded with attempt " + task.successfulAttempt);
+ // issue kill to all other attempts
+ TaskFinishedEvent tfe =
+ new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+ task.getFinishTime(),
+ TypeConverter.fromYarn(task.taskId.taskType),
+ TaskState.SUCCEEDED.toString(),
+ TypeConverter.fromYarn(task.getCounters()));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+ for (TaskAttempt attempt : task.attempts.values()) {
+ if (attempt.getID() != task.successfulAttempt &&
+ // This is okay because it can only talk us out of sending a
+ // TA_KILL message to an attempt that doesn't need one for
+ // other reasons.
+ !attempt.isFinished()) {
+ LOG.info("Issuing kill to other attempt " + attempt.getID());
+ task.eventHandler.handle(
+ new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_KILL));
+ }
+ }
+ }
+ }
+
+ private static class AttemptKilledTransition implements
+ SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ TaskAttemptCompletionEventStatus.KILLED);
+ --task.numberUncompletedAttempts;
+ if (task.successfulAttempt == null) {
+ task.addAndScheduleAttempt();
+ }
+ }
+ }
+
+
+ private static class KillWaitAttemptKilledTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+ protected TaskState finalState = TaskState.KILLED;
+
+ @Override
+ public TaskState transition(TaskImpl task, TaskEvent event) {
+ task.handleTaskAttemptCompletion(
+ ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ TaskAttemptCompletionEventStatus.KILLED);
+ // check whether all attempts are finished
+ if (task.finishedAttempts == task.attempts.size()) {
+ task.eventHandler.handle(
+ new JobTaskEvent(task.taskId, finalState));
+ return finalState;
+ }
+ return task.getState();
+ }
+ }
+
+ private static class AttemptFailedTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+ @Override
+ public TaskState transition(TaskImpl task, TaskEvent event) {
+ task.failedAttempts++;
+ TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ if (task.failedAttempts < task.maxAttempts) {
+ task.handleTaskAttemptCompletion(
+ ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ TaskAttemptCompletionEventStatus.FAILED);
+ // we don't need a new event if we already have a spare
+ if (--task.numberUncompletedAttempts == 0
+ && task.successfulAttempt == null) {
+ task.addAndScheduleAttempt();
+ }
+ } else {
+ task.handleTaskAttemptCompletion(
+ ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ TaskAttemptCompletionEventStatus.TIPFAILED);
+ TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
+ TaskFinishedEvent tfi =
+ new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+ task.getFinishTime(),
+ TypeConverter.fromYarn(task.taskId.taskType),
+ TaskState.FAILED.toString(),
+ TypeConverter.fromYarn(task.getCounters()));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfi));
+ task.eventHandler.handle(
+ new JobTaskEvent(task.taskId, TaskState.FAILED));
+ return TaskState.FAILED;
+ }
+ return getDefaultState(task);
+ }
+
+ protected TaskState getDefaultState(Task task) {
+ return task.getState();
+ }
+
+ protected void unSucceed(TaskImpl task) {
+ ++task.numberUncompletedAttempts;
+ task.successfulAttempt = null;
+ }
+ }
+
+ private static class MapRetroactiveFailureTransition
+ extends AttemptFailedTransition {
+
+ @Override
+ public TaskState transition(TaskImpl task, TaskEvent event) {
+ //verify that this occurs only for map task
+ //TODO: consider moving it to MapTaskImpl
+ if (!TaskType.MAP.equals(task.getType())) {
+ LOG.error("Unexpected event for REDUCE task " + event.getType());
+ task.internalError(event.getType());
+ }
+
+ // tell the job about the rescheduling
+ task.eventHandler.handle(
+ new JobMapTaskRescheduledEvent(task.taskId));
+ // super.transition is mostly coded for the case where an
+ // UNcompleted task failed. When a COMPLETED task retroactively
+ // fails, we have to let AttemptFailedTransition.transition
+ // believe that there's no redundancy.
+ unSucceed(task);
+ return super.transition(task, event);
+ }
+
+ @Override
+ protected TaskState getDefaultState(Task task) {
+ return TaskState.SCHEDULED;
+ }
+ }
+
+ private static class KillNewTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TaskFinishedEvent tfe =
+ new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+ task.getFinishTime(), TypeConverter.fromYarn(task.taskId.taskType),
+ TaskState.KILLED.toString(), TypeConverter.fromYarn(task
+ .getCounters()));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+ task.eventHandler.handle(
+ new JobTaskEvent(task.taskId, TaskState.KILLED));
+ }
+ }
+
+ private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+ if (attempt != null && !attempt.isFinished()) {
+ eventHandler.handle(
+ new TaskAttemptEvent(attempt.getID(),
+ TaskAttemptEventType.TA_KILL));
+ }
+ }
+
+ private static class KillTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ // issue kill to all non finished attempts
+ for (TaskAttempt attempt : task.attempts.values()) {
+ task.killUnfinishedAttempt
+ (attempt, "Task KILL is received. Killing attempt!");
+ }
+
+ task.numberUncompletedAttempts = 0;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerLauncher
+ extends EventHandler<ContainerLauncherEvent> {
+
+ enum EventType {
+ CONTAINER_REMOTE_LAUNCH,
+ CONTAINER_REMOTE_CLEANUP
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,66 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerLauncherEvent
+ extends AbstractEvent<ContainerLauncher.EventType> {
+
+ private TaskAttemptID taskAttemptID;
+ private ContainerID containerID;
+ private String containerMgrAddress;
+ private ContainerToken containerToken;
+
+ public ContainerLauncherEvent(TaskAttemptID taskAttemptID,
+ ContainerID containerID,
+ String containerMgrAddress,
+ ContainerToken containerToken,
+ ContainerLauncher.EventType type) {
+ super(type);
+ this.taskAttemptID = taskAttemptID;
+ this.containerID = containerID;
+ this.containerMgrAddress = containerMgrAddress;
+ this.containerToken = containerToken;
+ }
+
+ public TaskAttemptID getTaskAttemptID() {
+ return this.taskAttemptID;
+ }
+
+ public ContainerID getContainerID() {
+ return containerID;
+ }
+
+ public String getContainerMgrAddress() {
+ return containerMgrAddress;
+ }
+
+ public ContainerToken getContainerToken() {
+ return containerToken;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " for taskAttempt " + taskAttemptID;
+ }
+}
|