Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar 31 22:23:22 2011
@@ -83,32 +83,33 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ContainerBuilderHelper;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.URL;
-import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.Phase;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
/**
* Implementation of TaskAttempt interface.
@@ -119,18 +120,19 @@ public abstract class TaskAttemptImpl im
EventHandler<TaskAttemptEvent> {
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final Configuration conf;
protected final Path jobFile;
protected final int partition;
protected final EventHandler eventHandler;
- private final TaskAttemptID attemptId;
+ private final TaskAttemptId attemptId;
private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final OutputCommitter committer;
private final Resource resourceCapability;
private final String[] dataLocalHosts;
- private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+ private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
private Collection<Token<? extends TokenIdentifier>> fsTokens;
@@ -360,7 +362,7 @@ public abstract class TaskAttemptImpl im
<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
stateMachine;
- private ContainerID containerID;
+ private ContainerId containerID;
private String containerMgrAddress;
private WrappedJvmID jvmID;
private ContainerToken containerToken;
@@ -372,16 +374,16 @@ public abstract class TaskAttemptImpl im
//this is the last status reported by the REMOTE running attempt
private TaskAttemptStatus reportedStatus;
- public TaskAttemptImpl(TaskID taskId, int i, EventHandler eventHandler,
+ public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Collection<Token<? extends TokenIdentifier>> fsTokens) {
- oldJobId = TypeConverter.fromYarn(taskId.jobID);
+ oldJobId = TypeConverter.fromYarn(taskId.getJobId());
this.conf = conf;
- attemptId = new TaskAttemptID();
- attemptId.taskID = taskId;
- attemptId.id = i;
+ attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
+ attemptId.setTaskId(taskId);
+ attemptId.setId(i);
this.taskAttemptListener = taskAttemptListener;
// Initialize reportedStatus
@@ -400,8 +402,8 @@ public abstract class TaskAttemptImpl im
this.partition = partition;
//TODO:create the resource reqt for this Task attempt
- this.resourceCapability = new Resource();
- this.resourceCapability.memory = getMemoryRequired(conf, taskId.taskType);
+ this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
+ this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
this.dataLocalHosts = dataLocalHosts;
// This "this leak" is okay because the retained pointer is in an
@@ -424,36 +426,31 @@ public abstract class TaskAttemptImpl im
LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
FileStatus fstat = fc.getFileStatus(file);
- LocalResource resource = new LocalResource();
- resource.resource = AvroUtil.getYarnUrlFromPath(fstat.getPath());
- resource.type = type;
- resource.state = visibility;
- resource.size = fstat.getLen();
- resource.timestamp = fstat.getModificationTime();
+ LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(fstat.getPath()));
+ resource.setType(type);
+ resource.setVisibility(visibility);
+ resource.setSize(fstat.getLen());
+ resource.setTimestamp(fstat.getModificationTime());
return resource;
}
private ContainerLaunchContext getContainer() {
- ContainerLaunchContext container = new ContainerLaunchContext();
- container.serviceData = new HashMap<CharSequence, ByteBuffer>();
- container.resources = new HashMap<CharSequence, LocalResource>();
- container.env = new HashMap<CharSequence, CharSequence>();
-
+ ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
try {
FileContext remoteFS = FileContext.getFileContext(conf);
Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
- URL jobConfFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(localizedJobConf);
+ URL jobConfFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(localizedJobConf);
LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
Path jobJar = remoteFS.makeQualified(new Path(remoteTask.getConf().get(MRJobConfig.JAR)));
- URL jobJarFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(jobJar);
- container.resources.put(YARNApplicationConstants.JOB_JAR,
- getLocalResource(remoteFS, jobJar,
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ URL jobJarFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(jobJar);
+ container.setLocalResource(YARNApplicationConstants.JOB_JAR, getLocalResource(remoteFS, jobJar,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-jar file on the remote FS is " + jobJarFileOnRemoteFS);
Path jobSubmitDir =
@@ -463,13 +460,14 @@ public abstract class TaskAttemptImpl im
remoteFS.makeQualified(new Path(jobSubmitDir,
YarnConfiguration.APPLICATION_TOKENS_FILE));
URL applicationTokenFileOnRemoteFS =
- AvroUtil.getYarnUrlFromPath(jobTokenFile);
+ ConverterUtils.getYarnUrlFromPath(jobTokenFile);
// TODO: Looks like this is not needed. Revisit during localization
// cleanup.
//container.resources_todo.put(YarnConfiguration.APPLICATION_TOKENS_FILE,
// getLocalResource(remoteFS, jobTokenFile,
// LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
- container.resources.put(YARNApplicationConstants.JOB_CONF_FILE,
+
+ container.setLocalResource(YARNApplicationConstants.JOB_CONF_FILE,
getLocalResource(remoteFS,
new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
@@ -499,25 +497,27 @@ public abstract class TaskAttemptImpl im
LOG.info("Size of containertokens_dob is "
+ taskCredentials.numberOfTokens());
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
- container.containerTokens =
+ container.setContainerTokens(
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
- containerTokens_dob.getLength());
+ containerTokens_dob.getLength()));
// Add shuffle token
LOG.info("Putting shuffle token in serviceData");
DataOutputBuffer jobToken_dob = new DataOutputBuffer();
jobToken.write(jobToken_dob);
// TODO: should depend on ShuffleHandler
- container.serviceData.put("mapreduce.shuffle",
+ container.setServiceData("mapreduce.shuffle",
ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()));
- MRApps.setInitialClasspath(container.env);
+ Map<String, String> env = new HashMap<String, String>();
+ MRApps.setInitialClasspath(env);
+ container.addAllEnv(env);
} catch (IOException e) {
throw new YarnException(e);
}
- container.id = containerID;
- container.user = conf.get(MRJobConfig.USER_NAME); // TODO: Fix
+ container.setContainerId(containerID);
+ container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix
File workDir = new File(ContainerBuilderHelper.getWorkDir());
String logDir = new File(workDir, "logs").toString();
@@ -535,18 +535,17 @@ public abstract class TaskAttemptImpl im
classPaths.add(workDir.toString()); // TODO
// Construct the actual Container
- container.command =
- MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
- remoteTask, javaHome, workDir.toString(), logDir,
- childTmpDir, jvmID);
+ container.addAllCommands(MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
+ remoteTask, javaHome, workDir.toString(), logDir,
+ childTmpDir, jvmID));
- MapReduceChildJVM.setVMEnv(container.env, classPaths, workDir.toString(),
+ MapReduceChildJVM.setVMEnv(container.getAllEnv(), classPaths, workDir.toString(),
nmLdLibraryPath, remoteTask, localizedApplicationTokensFile);
// Construct the actual Container
- container.id = containerID;
- container.user = conf.get(MRJobConfig.USER_NAME);
- container.resource = resourceCapability;
+ container.setContainerId(containerID);
+ container.setUser(conf.get(MRJobConfig.USER_NAME));
+ container.setResource(resourceCapability);
return container;
}
@@ -606,7 +605,7 @@ public abstract class TaskAttemptImpl im
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
- container.resources.put(
+ container.setLocalResource(
name.toUri().getPath(),
BuilderUtils.newLocalResource(
uris[i], type,
@@ -625,13 +624,13 @@ public abstract class TaskAttemptImpl im
private static final String CLASSPATH = "CLASSPATH";
private static void addCacheArtifactToClassPath(
ContainerLaunchContext container, String fileName) {
- CharSequence classpath = container.env.get(CLASSPATH);
+ String classpath = container.getEnv(CLASSPATH);
if (classpath == null) {
classpath = fileName;
} else {
classpath = classpath + ":" + fileName;
}
- container.env.put(CLASSPATH, classpath);
+ container.setEnv(CLASSPATH, classpath);
}
// TODO - Move this to MR!
@@ -648,7 +647,7 @@ public abstract class TaskAttemptImpl im
}
@Override
- public ContainerID getAssignedContainerID() {
+ public ContainerId getAssignedContainerID() {
readLock.lock();
try {
return containerID;
@@ -694,7 +693,7 @@ public abstract class TaskAttemptImpl im
protected abstract int getPriority();
@Override
- public TaskAttemptID getID() {
+ public TaskAttemptId getID() {
return attemptId;
}
@@ -713,20 +712,21 @@ public abstract class TaskAttemptImpl im
@Override
public TaskAttemptReport getReport() {
- TaskAttemptReport result = new TaskAttemptReport();
+ TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class);
readLock.lock();
try {
- result.id = attemptId;
+ result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
- result.state = getState();
- result.progress = reportedStatus.progress;
- result.startTime = launchTime;
- result.finishTime = finishTime;
- result.diagnosticInfo = reportedStatus.diagnosticInfo;
- result.phase = reportedStatus.phase;
- result.stateString = reportedStatus.stateString;
- result.counters = getCounters();
+
+ result.setTaskAttemptState(getState());
+ result.setProgress(reportedStatus.progress);
+ result.setStartTime(launchTime);
+ result.setFinishTime(finishTime);
+ result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
+ result.setPhase(reportedStatus.phase);
+ result.setStateString(reportedStatus.stateString);
+ result.setCounters(getCounters());
return result;
} finally {
readLock.unlock();
@@ -734,8 +734,8 @@ public abstract class TaskAttemptImpl im
}
@Override
- public List<CharSequence> getDiagnostics() {
- List<CharSequence> result = new ArrayList<CharSequence>();
+ public List<String> getDiagnostics() {
+ List<String> result = new ArrayList<String>();
readLock.lock();
try {
result.addAll(diagnostics);
@@ -751,8 +751,8 @@ public abstract class TaskAttemptImpl im
try {
Counters counters = reportedStatus.counters;
if (counters == null) {
- counters = new Counters();
- counters.groups = new HashMap<CharSequence, CounterGroup>();
+ counters = recordFactory.newRecordInstance(Counters.class);
+// counters.groups = new HashMap<String, CounterGroup>();
}
return counters;
} finally {
@@ -792,9 +792,9 @@ public abstract class TaskAttemptImpl im
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
- this.attemptId.taskID.jobID, "Invalid event " + event.getType() +
+ this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));
- eventHandler.handle(new JobEvent(this.attemptId.taskID.jobID,
+ eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
if (oldState != getState()) {
@@ -823,7 +823,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
// Tell any speculator that we're requesting a container
taskAttempt.eventHandler.handle
- (new SpeculatorEvent(taskAttempt.getID().taskID, +1));
+ (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
//request for container
taskAttempt.eventHandler.handle(
new ContainerRequestEvent(taskAttempt.attemptId,
@@ -845,7 +845,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
taskAttempt.jvmID = new WrappedJvmID(
taskAttempt.remoteTask.getTaskID().getJobID(),
- taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.id);
+ taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
//launch the container
//create the container object to be launched for a given Task attempt
@@ -865,7 +865,7 @@ public abstract class TaskAttemptImpl im
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle
- (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+ (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
}
}
@@ -892,7 +892,7 @@ public abstract class TaskAttemptImpl im
// we're transitioning out of UNASSIGNED
if (withdrawsContainerRequest) {
taskAttempt.eventHandler.handle
- (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+ (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
}
switch(finalState) {
@@ -923,11 +923,11 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
"tracker", 0);
taskAttempt.eventHandler.handle
- (new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tase));
+ (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, System.currentTimeMillis()));
@@ -976,23 +976,23 @@ public abstract class TaskAttemptImpl im
//set the finish time
taskAttempt.setFinishTime();
String taskType =
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType).toString();
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
LOG.info("In TaskAttemptImpl taskType: " + taskType);
if (taskType.equals("MAP")) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.SUCCEEDED.toString(),
taskAttempt.finishTime,
taskAttempt.finishTime, "hostname",
TaskAttemptState.SUCCEEDED.toString(),
TypeConverter.fromYarn(taskAttempt.getCounters()),null);
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
} else {
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.SUCCEEDED.toString(),
taskAttempt.finishTime,
taskAttempt.finishTime,
@@ -1000,7 +1000,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptState.SUCCEEDED.toString(),
TypeConverter.fromYarn(taskAttempt.getCounters()),null);
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
}
/*
TaskAttemptFinishedEvent tfe =
@@ -1029,28 +1029,28 @@ public abstract class TaskAttemptImpl im
TaskAttemptUnsuccessfulCompletionEvent ta =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.FAILED.toString(),
taskAttempt.finishTime,
"hostname",
taskAttempt.reportedStatus.diagnosticInfo.toString());
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, ta));
- if (taskAttempt.attemptId.taskID.taskType == TaskType.MAP) {
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta));
+ if (taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.FAILED.toString(),
taskAttempt.finishTime,
taskAttempt.finishTime, "hostname",
TaskAttemptState.FAILED.toString(),
TypeConverter.fromYarn(taskAttempt.getCounters()),null);
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
} else {
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.FAILED.toString(),
taskAttempt.finishTime,
taskAttempt.finishTime,
@@ -1058,7 +1058,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptState.FAILED.toString(),
TypeConverter.fromYarn(taskAttempt.getCounters()),null);
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
@@ -1090,13 +1090,13 @@ public abstract class TaskAttemptImpl im
TaskAttemptUnsuccessfulCompletionEvent tke =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+ TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
TaskAttemptState.KILLED.toString(),
taskAttempt.finishTime,
TaskAttemptState.KILLED.toString(),
taskAttempt.reportedStatus.diagnosticInfo.toString());
taskAttempt.eventHandler.handle(
- new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tke));
+ new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
@@ -1121,7 +1121,7 @@ public abstract class TaskAttemptImpl im
}
}
- private void addDiagnosticInfo(CharSequence diag) {
+ private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
}
@@ -1175,8 +1175,8 @@ public abstract class TaskAttemptImpl im
result.diagnosticInfo = new String("");
result.phase = Phase.STARTING;
result.stateString = new String("NEW");
- Counters counters = new Counters();
- counters.groups = new HashMap<CharSequence, CounterGroup>();
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+// counters.groups = new HashMap<String, CounterGroup>();
result.counters = counters;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar 31 22:23:22 2011
@@ -40,6 +40,16 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -57,21 +67,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
/**
* Implementation of Task interface.
@@ -86,11 +88,14 @@ public abstract class TaskImpl implement
protected final int partition;
protected final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
- private final TaskID taskId;
- private Map<TaskAttemptID, TaskAttempt> attempts;
+ private final TaskId taskId;
+ private Map<TaskAttemptId, TaskAttempt> attempts;
private final int maxAttempts;
private final Lock readLock;
private final Lock writeLock;
+
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
protected Collection<Token<? extends TokenIdentifier>> fsTokens;
protected Token<JobTokenIdentifier> jobToken;
@@ -198,9 +203,9 @@ public abstract class TaskImpl implement
//should be set to one which comes first
//saying COMMIT_PENDING
- private TaskAttemptID commitAttempt;
+ private TaskAttemptId commitAttempt;
- private TaskAttemptID successfulAttempt;
+ private TaskAttemptId successfulAttempt;
private int failedAttempts;
private int finishedAttempts;//finish are total of success, failed and killed
@@ -210,7 +215,7 @@ public abstract class TaskImpl implement
return stateMachine.getCurrentState();
}
- public TaskImpl(JobID jobId, TaskType taskType, int partition,
+ public TaskImpl(JobId jobId, TaskType taskType, int partition,
EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
@@ -225,10 +230,10 @@ public abstract class TaskImpl implement
// have a convention that none of the overrides depends on any
// fields that need initialization.
maxAttempts = getMaxAttempts();
- taskId = new TaskID();
- taskId.jobID = jobId;
- taskId.id = partition;
- taskId.taskType = taskType;
+ taskId = recordFactory.newRecordInstance(TaskId.class);
+ taskId.setJobId(jobId);
+ taskId.setId(partition);
+ taskId.setTaskType(taskType);
this.partition = partition;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
@@ -242,7 +247,7 @@ public abstract class TaskImpl implement
}
@Override
- public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+ public Map<TaskAttemptId, TaskAttempt> getAttempts() {
readLock.lock();
try {
@@ -250,8 +255,8 @@ public abstract class TaskImpl implement
return attempts;
}
- Map<TaskAttemptID, TaskAttempt> result
- = new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+ Map<TaskAttemptId, TaskAttempt> result
+ = new LinkedHashMap<TaskAttemptId, TaskAttempt>();
result.putAll(attempts);
return result;
@@ -261,7 +266,7 @@ public abstract class TaskImpl implement
}
@Override
- public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+ public TaskAttempt getAttempt(TaskAttemptId attemptID) {
readLock.lock();
try {
return attempts.get(attemptID);
@@ -271,7 +276,7 @@ public abstract class TaskImpl implement
}
@Override
- public TaskID getID() {
+ public TaskId getID() {
return taskId;
}
@@ -290,24 +295,24 @@ public abstract class TaskImpl implement
@Override
public TaskReport getReport() {
- TaskReport report = new TaskReport();
+ TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
readLock.lock();
try {
- report.id = taskId;
- report.startTime = getLaunchTime();
- report.finishTime = getFinishTime();
- report.state = getState();
- report.progress = getProgress();
- report.counters = getCounters();
- report.runningAttempts = new ArrayList<TaskAttemptID>();
- report.runningAttempts.addAll(attempts.keySet());
- report.successfulAttempt = successfulAttempt;
+ report.setTaskId(taskId);
+ report.setStartTime(getLaunchTime());
+ report.setFinishTime(getFinishTime());
+ report.setTaskState(getState());
+ report.setProgress(getProgress());
+ report.setCounters(getCounters());
+
+ report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
+ report.setSuccessfulAttempt(successfulAttempt);
- report.diagnostics = new ArrayList<CharSequence>();
for (TaskAttempt att : attempts.values()) {
String prefix = "AttemptID:" + att.getID() + " Info:";
for (CharSequence cs : att.getDiagnostics()) {
- report.diagnostics.add(prefix + cs);
+ report.addDiagnostics(prefix + cs);
+
}
}
return report;
@@ -325,8 +330,8 @@ public abstract class TaskImpl implement
if (bestAttempt != null) {
counters = bestAttempt.getCounters();
} else {
- counters = new Counters();
- counters.groups = new HashMap<CharSequence, CounterGroup>();
+ counters = recordFactory.newRecordInstance(Counters.class);
+// counters.groups = new HashMap<CharSequence, CounterGroup>();
}
return counters;
} finally {
@@ -395,7 +400,7 @@ public abstract class TaskImpl implement
}
@Override
- public boolean canCommit(TaskAttemptID taskAttemptID) {
+ public boolean canCommit(TaskAttemptId taskAttemptID) {
readLock.lock();
boolean canCommit = false;
try {
@@ -437,7 +442,7 @@ public abstract class TaskImpl implement
case 1:
Map newAttempts
- = new LinkedHashMap<TaskAttemptID, TaskAttempt>(maxAttempts);
+ = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
attempts.put(attempt.getID(), attempt);
@@ -478,29 +483,27 @@ public abstract class TaskImpl implement
private void internalError(TaskEventType type) {
eventHandler.handle(new JobDiagnosticsUpdateEvent(
- this.taskId.jobID, "Invalid event " + type +
+ this.taskId.getJobId(), "Invalid event " + type +
" on Task " + this.taskId));
- eventHandler.handle(new JobEvent(this.taskId.jobID,
+ eventHandler.handle(new JobEvent(this.taskId.getJobId(),
JobEventType.INTERNAL_ERROR));
}
// always called inside a transition, in turn inside the Write Lock
- private void handleTaskAttemptCompletion(TaskAttemptID attemptId,
+ private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
TaskAttemptCompletionEventStatus status) {
finishedAttempts++;
TaskAttempt attempt = attempts.get(attemptId);
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getAssignedContainerMgrAddress() != null) {
- TaskAttemptCompletionEvent tce = new TaskAttemptCompletionEvent();
- tce.eventId = -1;
+ TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+ tce.setEventId(-1);
//TODO: XXXXXX hardcoded port
- tce.mapOutputServerAddress =
- "http://" + attempt.getAssignedContainerMgrAddress().split(":")[0]
- + ":8080";
- tce.status = status;
- tce.attemptId = attempt.getID();
- tce.attemptRunTime = 0; // TODO: set the exact run time of the task.
+ tce.setMapOutputServerAddress("http://" + attempt.getAssignedContainerMgrAddress().split(":")[0] + ":8080");
+ tce.setStatus(status);
+ tce.setAttemptId(attempt.getID());
+ tce.setAttemptRunTime(0); // TODO: set the exact run time of the task.
//raise the event to job so that it adds the completion event to its
//data structures
@@ -515,8 +518,8 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
TaskStartedEvent tse = new TaskStartedEvent(TypeConverter
.fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
- .fromYarn(task.taskId.taskType), TaskState.RUNNING.toString());
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tse));
+ .fromYarn(task.taskId.getTaskType()), TaskState.RUNNING.toString());
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
task.addAndScheduleAttempt();
}
@@ -543,7 +546,7 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
// The nextAttemptNumber is commit pending, decide on set the commitAttempt
- TaskAttemptID attemptID = ev.getTaskAttemptID();
+ TaskAttemptId attemptID = ev.getTaskAttemptID();
if (task.commitAttempt == null) {
// TODO: validate attemptID
task.commitAttempt = attemptID;
@@ -576,10 +579,10 @@ public abstract class TaskImpl implement
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
task.getFinishTime(),
- TypeConverter.fromYarn(task.taskId.taskType),
+ TypeConverter.fromYarn(task.taskId.getTaskType()),
TaskState.SUCCEEDED.toString(),
TypeConverter.fromYarn(task.getCounters()));
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a
@@ -654,10 +657,10 @@ public abstract class TaskImpl implement
TaskFinishedEvent tfi =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
task.getFinishTime(),
- TypeConverter.fromYarn(task.taskId.taskType),
+ TypeConverter.fromYarn(task.taskId.getTaskType()),
TaskState.FAILED.toString(),
TypeConverter.fromYarn(task.getCounters()));
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfi));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi));
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED));
return TaskState.FAILED;
@@ -710,10 +713,10 @@ public abstract class TaskImpl implement
public void transition(TaskImpl task, TaskEvent event) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
- task.getFinishTime(), TypeConverter.fromYarn(task.taskId.taskType),
+ task.getFinishTime(), TypeConverter.fromYarn(task.taskId.getTaskType()),
TaskState.KILLED.toString(), TypeConverter.fromYarn(task
.getCounters()));
- task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+ task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.KILLED));
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Thu Mar 31 22:23:22 2011
@@ -18,21 +18,21 @@
package org.apache.hadoop.mapreduce.v2.app.launcher;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class ContainerLauncherEvent
extends AbstractEvent<ContainerLauncher.EventType> {
- private TaskAttemptID taskAttemptID;
- private ContainerID containerID;
+ private TaskAttemptId taskAttemptID;
+ private ContainerId containerID;
private String containerMgrAddress;
private ContainerToken containerToken;
- public ContainerLauncherEvent(TaskAttemptID taskAttemptID,
- ContainerID containerID,
+ public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
+ ContainerId containerID,
String containerMgrAddress,
ContainerToken containerToken,
ContainerLauncher.EventType type) {
@@ -43,11 +43,11 @@ public class ContainerLauncherEvent
this.containerToken = containerToken;
}
- public TaskAttemptID getTaskAttemptID() {
+ public TaskAttemptId getTaskAttemptID() {
return this.taskAttemptID;
}
- public ContainerID getContainerID() {
+ public ContainerId getContainerID() {
return containerID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Mar 31 22:23:22 2011
@@ -41,14 +41,18 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerToken;
/**
* This class is responsible for launching of containers.
@@ -117,7 +121,7 @@ public class ContainerLauncherImpl exten
super.stop();
}
- protected ContainerManager getCMProxy(ContainerID containerID,
+ protected ContainerManager getCMProxy(ContainerId containerID,
final String containerManagerBindAddr, ContainerToken containerToken)
throws IOException {
@@ -125,10 +129,10 @@ public class ContainerLauncherImpl exten
if (UserGroupInformation.isSecurityEnabled()) {
Token<ContainerTokenIdentifier> token =
new Token<ContainerTokenIdentifier>(
- containerToken.identifier.array(),
- containerToken.password.array(), new Text(
- containerToken.kind.toString()), new Text(
- containerToken.service.toString()));
+ containerToken.getIdentifier().array(),
+ containerToken.getPassword().array(), new Text(
+ containerToken.getKind()), new Text(
+ containerToken.getService()));
user.addToken(token);
}
ContainerManager proxy =
@@ -161,7 +165,7 @@ public class ContainerLauncherImpl exten
// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
final String containerManagerBindAddr = event.getContainerMgrAddress();
- ContainerID containerID = event.getContainerID();
+ ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
switch(event.getType()) {
@@ -181,7 +185,9 @@ public class ContainerLauncherImpl exten
// TODO: Make sure that child's mapred-local-dir is set correctly.
// Now launch the actual container
- proxy.startContainer(containerLaunchContext);
+ StartContainerRequest startRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ proxy.startContainer(startRequest);
// after launching send launched event to taskattempt
context.getEventHandler().handle(
@@ -201,7 +207,7 @@ public class ContainerLauncherImpl exten
// and not yet processed
if (eventQueue.contains(event)) {
eventQueue.remove(event); // TODO: Any synchro needed?
- // TODO: raise any event?
+ // k: raise any event?
} else {
try {
ContainerManager proxy =
@@ -209,8 +215,13 @@ public class ContainerLauncherImpl exten
// TODO:check whether container is launched
// kill the remote container if already launched
- proxy.stopContainer(event.getContainerID());
- proxy.cleanupContainer(event.getContainerID());
+ StopContainerRequest stopRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(event.getContainerID());
+ proxy.stopContainer(stopRequest);
+
+ CleanupContainerRequest cleanupRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(CleanupContainerRequest.class);
+ cleanupRequest.setContainerId(event.getContainerID());
+ proxy.cleanupContainer(cleanupRequest);
} catch (Exception e) {
//ignore the cleanup failure
LOG.warn("cleanup failed for container " + event.getContainerID() ,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Thu Mar 31 22:23:22 2011
@@ -19,15 +19,15 @@
package org.apache.hadoop.mapreduce.v2.app.launcher;
import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
- public ContainerRemoteLaunchEvent(TaskAttemptID taskAttemptID,
- ContainerID containerID, String containerMgrAddress,
+ public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
+ ContainerId containerID, String containerMgrAddress,
ContainerToken containerToken) {
super(taskAttemptID, containerID, containerMgrAddress,
containerToken,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java Thu Mar 31 22:23:22 2011
@@ -19,20 +19,20 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class ContainerAllocatorEvent extends
AbstractEvent<ContainerAllocator.EventType> {
- private TaskAttemptID attemptID;
+ private TaskAttemptId attemptID;
- public ContainerAllocatorEvent(TaskAttemptID attemptID,
+ public ContainerAllocatorEvent(TaskAttemptId attemptID,
ContainerAllocator.EventType type) {
super(type);
this.attemptID = attemptID;
}
- public TaskAttemptID getAttemptID() {
+ public TaskAttemptId getAttemptID() {
return attemptID;
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Thu Mar 31 22:23:22 2011
@@ -18,9 +18,11 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
public class ContainerRequestEvent extends ContainerAllocatorEvent {
@@ -29,13 +31,13 @@ public class ContainerRequestEvent exten
private String[] hosts;
private String[] racks;
- public ContainerRequestEvent(TaskAttemptID attemptID,
+ public ContainerRequestEvent(TaskAttemptId attemptID,
Resource capability, int priority,
String[] hosts, String[] racks) {
super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ);
this.capability = capability;
- this.priority = new Priority();
- this.priority.priority = priority;
+ this.priority = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
+ this.priority.setPriority(priority);
this.hosts = hosts;
this.racks = racks;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Mar 31 22:23:22 2011
@@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -46,24 +47,31 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.AMRMProtocol;
-import org.apache.hadoop.yarn.AMResponse;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+
/**
* Allocates the container from the ResourceManager scheduler.
@@ -74,7 +82,7 @@ implements ContainerAllocator {
LogFactory.getLog(RMContainerAllocator.class);
private static final String ANY = "*";
private static int rmPollInterval;//millis
- private ApplicationID applicationId;
+ private ApplicationId applicationId;
private EventHandler eventHandler;
private volatile boolean stopped;
protected Thread allocatorThread;
@@ -82,10 +90,12 @@ implements ContainerAllocator {
private AMRMProtocol scheduler;
private final ClientService clientService;
private int lastResponseID = 0;
+
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
//mapping for assigned containers
- private final Map<ContainerID, TaskAttemptID> assignedMap =
- new HashMap<ContainerID, TaskAttemptID>();
+ private final Map<ContainerId, TaskAttemptId> assignedMap =
+ new HashMap<ContainerId, TaskAttemptId>();
private final Map<Priority,
Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue =
@@ -110,7 +120,7 @@ implements ContainerAllocator {
this.clientService = clientService;
this.applicationId = context.getApplicationID();
this.eventHandler = context.getEventHandler();
- this.applicationMaster = new ApplicationMaster();
+ this.applicationMaster = recordFactory.newRecordInstance(ApplicationMaster.class);
}
@Override
@@ -130,17 +140,18 @@ implements ContainerAllocator {
protected void register() {
//Register
- applicationMaster.applicationId = applicationId;
- applicationMaster.host =
- clientService.getBindAddress().getAddress().getHostAddress();
- applicationMaster.rpcPort = clientService.getBindAddress().getPort();
- applicationMaster.state = ApplicationState.RUNNING;
- applicationMaster.httpPort = clientService.getHttpPort();
- applicationMaster.status = new ApplicationStatus();
- applicationMaster.status.applicationId = applicationId;
- applicationMaster.status.progress = 0.0f;
+ applicationMaster.setApplicationId(applicationId);
+ applicationMaster.setHost(clientService.getBindAddress().getAddress().getHostAddress());
+ applicationMaster.setRpcPort(clientService.getBindAddress().getPort());
+ applicationMaster.setState(ApplicationState.RUNNING);
+ applicationMaster.setHttpPort(clientService.getHttpPort());
+ applicationMaster.setStatus(recordFactory.newRecordInstance(ApplicationStatus.class));
+ applicationMaster.getStatus().setApplicationId(applicationId);
+ applicationMaster.getStatus().setProgress(0.0f);
try {
- scheduler.registerApplicationMaster(applicationMaster);
+ RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
+ request.setApplicationMaster(applicationMaster);
+ scheduler.registerApplicationMaster(request);
} catch(Exception are) {
LOG.info("Exception while registering", are);
throw new YarnException(are);
@@ -149,8 +160,10 @@ implements ContainerAllocator {
protected void unregister() {
try {
- applicationMaster.state = ApplicationState.COMPLETED;
- scheduler.finishApplicationMaster(applicationMaster);
+ applicationMaster.setState(ApplicationState.COMPLETED);
+ FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setApplicationMaster(applicationMaster);
+ scheduler.finishApplicationMaster(request);
} catch(Exception are) {
LOG.info("Error while unregistering ", are);
}
@@ -300,20 +313,20 @@ implements ContainerAllocator {
}
ResourceRequest remoteRequest = reqMap.get(capability);
if (remoteRequest == null) {
- remoteRequest = new ResourceRequest();
- remoteRequest.priority = priority;
- remoteRequest.hostName = resourceName;
- remoteRequest.capability = capability;
- remoteRequest.numContainers = 0;
+ remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
+ remoteRequest.setPriority(priority);
+ remoteRequest.setHostName(resourceName);
+ remoteRequest.setCapability(capability);
+ remoteRequest.setNumContainers(0);
reqMap.put(capability, remoteRequest);
}
- remoteRequest.numContainers++;
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
// Note this down for next interaction with ResourceManager
ask.add(remoteRequest);
- LOG.info("addResourceRequest:" + " applicationId=" + applicationId.id
- + " priority=" + priority.priority + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.numContainers + " #asks="
+ LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+ + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
}
@@ -324,13 +337,13 @@ implements ContainerAllocator {
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
ResourceRequest remoteRequest = reqMap.get(capability);
- LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.id
- + " priority=" + priority.priority + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.numContainers + " #asks="
+ LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+ + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
- remoteRequest.numContainers--;
- if (remoteRequest.numContainers == 0) {
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+ if (remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability);
if (reqMap.size() == 0) {
remoteRequests.remove(resourceName);
@@ -345,21 +358,25 @@ implements ContainerAllocator {
//already have it.
}
- LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.id
- + " priority=" + priority.priority + " resourceName=" + resourceName
- + " numContainers=" + remoteRequest.numContainers + " #asks="
+ LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.getId()
+ + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
}
private List<Container> getResources() throws Exception {
- ApplicationStatus status = new ApplicationStatus();
- status.applicationId = applicationId;
- status.responseID = lastResponseID;
- AMResponse response =
- scheduler.allocate(status,
- new ArrayList(ask), new ArrayList(release));
- lastResponseID = response.responseId;
- List<Container> allContainers = response.containers;
+ ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+ status.setApplicationId(applicationId);
+ status.setResponseId(lastResponseID);
+
+ AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
+ allocateRequest.setApplicationStatus(status);
+ allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
+ allocateRequest.addAllReleases(new ArrayList<Container>(release));
+ AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+ AMResponse response = allocateResponse.getAMResponse();
+ lastResponseID = response.getResponseId();
+ List<Container> allContainers = response.getContainerList();
ask.clear();
release.clear();
@@ -369,15 +386,15 @@ implements ContainerAllocator {
" recieved=" + allContainers.size());
List<Container> allocatedContainers = new ArrayList<Container>();
for (Container cont : allContainers) {
- if (cont.state != ContainerState.COMPLETE) {
+ if (cont.getState() != ContainerState.COMPLETE) {
allocatedContainers.add(cont);
LOG.debug("Received Container :" + cont);
} else {
LOG.info("Received completed container " + cont);
- TaskAttemptID attemptID = assignedMap.remove(cont.id);
+ TaskAttemptId attemptID = assignedMap.remove(cont.getId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id " +
- cont.id);
+ cont.getId());
} else {
//send the container completed event to Task attempt
eventHandler.handle(new TaskAttemptEvent(attemptID,
@@ -413,8 +430,8 @@ implements ContainerAllocator {
private void assign(Priority priority, List<Container> allocatedContainers) {
for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
Container allocatedContainer = i.next();
- String host = allocatedContainer.hostName.toString();
- Resource capability = allocatedContainer.resource;
+ String host = allocatedContainer.getHostName();
+ Resource capability = allocatedContainer.getResource();
LinkedList<ContainerRequestEvent> requestList =
localRequestsQueue.get(priority).get(capability);
@@ -455,15 +472,15 @@ implements ContainerAllocator {
//send the container assigned event to Task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned
- .getAttemptID(), allocatedContainer.id,
- allocatedContainer.hostName.toString(),
- allocatedContainer.containerToken));
+ .getAttemptID(), allocatedContainer.getId(),
+ allocatedContainer.getHostName(),
+ allocatedContainer.getContainerToken()));
- assignedMap.put(allocatedContainer.id, assigned.getAttemptID());
+ assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID());
LOG.info("Assigned container (" + allocatedContainer + ") " +
" to task " + assigned.getAttemptID() + " at priority " + priority +
- " on node " + allocatedContainer.hostName.toString());
+ " on node " + allocatedContainer.getHostName());
}
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java Thu Mar 31 22:23:22 2011
@@ -33,9 +33,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ContainerID;
/**
* Reads the static list of NodeManager from config file and allocate
@@ -137,7 +138,7 @@ public class StaticContainerAllocator ex
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
if (nextIndex < containerMgrList.size()) {
String containerMgr = containerMgrList.get(nextIndex);
- ContainerID containerID = generateContainerID();
+ ContainerId containerID = generateContainerID();
context.getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(
@@ -147,10 +148,10 @@ public class StaticContainerAllocator ex
}
}
- private ContainerID generateContainerID() {
- ContainerID cId = new ContainerID();
- cId.appID = context.getApplicationID();
- cId.id = containerCount++;
+ private ContainerId generateContainerID() {
+ ContainerId cId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+ cId.setAppId(context.getApplicationID());
+ cId.setId(containerCount++);
return cId;
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Mar 31 22:23:22 2011
@@ -33,9 +33,13 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.Clock;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -47,10 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
public class DefaultSpeculator extends AbstractService implements
Speculator {
@@ -71,8 +72,8 @@ public class DefaultSpeculator extends A
private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
- private final ConcurrentMap<TaskID, Boolean> runningTasks
- = new ConcurrentHashMap<TaskID, Boolean>();
+ private final ConcurrentMap<TaskId, Boolean> runningTasks
+ = new ConcurrentHashMap<TaskId, Boolean>();
private final Map<Task, AtomicBoolean> pendingSpeculations
= new ConcurrentHashMap<Task, AtomicBoolean>();
@@ -80,12 +81,12 @@ public class DefaultSpeculator extends A
// These are the current needs, not the initial needs. For each job, these
// record the number of attempts that exist and that are actively
// waiting for a container [as opposed to running or finished]
- private final ConcurrentMap<JobID, AtomicInteger> mapContainerNeeds
- = new ConcurrentHashMap<JobID, AtomicInteger>();
- private final ConcurrentMap<JobID, AtomicInteger> reduceContainerNeeds
- = new ConcurrentHashMap<JobID, AtomicInteger>();
+ private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds
+ = new ConcurrentHashMap<JobId, AtomicInteger>();
+ private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
+ = new ConcurrentHashMap<JobId, AtomicInteger>();
- private final Set<TaskID> mayHaveSpeculated = new HashSet();
+ private final Set<TaskId> mayHaveSpeculated = new HashSet();
private final Configuration conf;
private AppContext context;
@@ -239,11 +240,11 @@ public class DefaultSpeculator extends A
// This section contains the code that gets run for a SpeculatorEvent
- private AtomicInteger containerNeed(TaskID taskID) {
- JobID jobID = taskID.jobID;
- TaskType taskType = taskID.taskType;
+ private AtomicInteger containerNeed(TaskId taskID) {
+ JobId jobID = taskID.getJobId();
+ TaskType taskType = taskID.getTaskType();
- ConcurrentMap<JobID, AtomicInteger> relevantMap
+ ConcurrentMap<JobId, AtomicInteger> relevantMap
= taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
AtomicInteger result = relevantMap.get(jobID);
@@ -289,9 +290,9 @@ public class DefaultSpeculator extends A
String stateString = reportedStatus.stateString.toString();
- TaskAttemptID attemptID = reportedStatus.id;
- TaskID taskID = attemptID.taskID;
- Job job = context.getJob(taskID.jobID);
+ TaskAttemptId attemptID = reportedStatus.id;
+ TaskId taskID = attemptID.getTaskId();
+ Job job = context.getJob(taskID.getJobId());
if (job == null) {
return;
@@ -335,10 +336,10 @@ public class DefaultSpeculator extends A
//
// All of these values are negative. Any value that should be allowed to
// speculate is 0 or positive.
- private long speculationValue(TaskID taskID, long now) {
- Job job = context.getJob(taskID.jobID);
+ private long speculationValue(TaskId taskID, long now) {
+ Job job = context.getJob(taskID.getJobId());
Task task = job.getTask(taskID);
- Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+ Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
long acceptableRuntime = Long.MIN_VALUE;
long result = Long.MIN_VALUE;
@@ -349,7 +350,7 @@ public class DefaultSpeculator extends A
}
}
- TaskAttemptID runningTaskAttemptID = null;
+ TaskAttemptId runningTaskAttemptID = null;
int numberRunningAttempts = 0;
@@ -407,7 +408,7 @@ public class DefaultSpeculator extends A
}
//Add attempt to a given Task.
- protected void addSpeculativeAttempt(TaskID taskID) {
+ protected void addSpeculativeAttempt(TaskId taskID) {
System.out.println
("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
@@ -433,10 +434,10 @@ public class DefaultSpeculator extends A
long now = clock.getTime();
- ConcurrentMap<JobID, AtomicInteger> containerNeeds
+ ConcurrentMap<JobId, AtomicInteger> containerNeeds
= type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
- for (ConcurrentMap.Entry<JobID, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
+ for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
// This race conditon is okay. If we skip a speculation attempt we
// should have tried because the event that lowers the number of
// containers needed to zero hasn't come through, it will next time.
@@ -444,7 +445,7 @@ public class DefaultSpeculator extends A
// zero but increased due to a failure it's not too bad to launch one
// container prematurely.
if (jobEntry.getValue().get() > 0) {
- break;
+ continue;
}
int numberSpeculationsAlready = 0;
@@ -453,18 +454,18 @@ public class DefaultSpeculator extends A
// loop through the tasks of the kind
Job job = context.getJob(jobEntry.getKey());
- Map<TaskID, Task> tasks = job.getTasks(type);
+ Map<TaskId, Task> tasks = job.getTasks(type);
int numberAllowedSpeculativeTasks
= (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
- TaskID bestTaskID = null;
+ TaskId bestTaskID = null;
long bestSpeculationValue = -1L;
// this loop is potentially pricey.
// TODO track the tasks that are potentially worth looking at
- for (Map.Entry<TaskID, Task> taskEntry : tasks.entrySet()) {
+ for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
if (mySpeculationValue == ALREADY_SPECULATING) {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Mar 31 22:23:22 2011
@@ -20,15 +20,13 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
/*
* This estimator exponentially smooths the rate of progress vrs. wallclock
@@ -37,8 +35,8 @@ import org.apache.hadoop.mapreduce.v2.ap
*/
public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
- private final ConcurrentMap<TaskAttemptID, AtomicReference<EstimateVector>> estimates
- = new ConcurrentHashMap<TaskAttemptID, AtomicReference<EstimateVector>>();
+ private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
+ = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();
private SmoothedValue smoothedValue;
@@ -94,7 +92,7 @@ public class ExponentiallySmoothedTaskRu
}
private void incorporateReading
- (TaskAttemptID attemptID, float newProgress, long newTime) {
+ (TaskAttemptId attemptID, float newProgress, long newTime) {
AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
if (vectorRef == null) {
@@ -119,7 +117,7 @@ public class ExponentiallySmoothedTaskRu
}
}
- private EstimateVector getEstimateVector(TaskAttemptID attemptID) {
+ private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
if (vectorRef == null) {
@@ -145,7 +143,7 @@ public class ExponentiallySmoothedTaskRu
}
@Override
- public long estimatedRuntime(TaskAttemptID id) {
+ public long estimatedRuntime(TaskAttemptId id) {
Long startTime = startTimes.get(id);
if (startTime == null) {
@@ -179,13 +177,13 @@ public class ExponentiallySmoothedTaskRu
}
@Override
- public long runtimeEstimateVariance(TaskAttemptID id) {
+ public long runtimeEstimateVariance(TaskAttemptId id) {
return -1L;
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
- TaskAttemptID attemptID = status.id;
+ TaskAttemptId attemptID = status.id;
float progress = status.progress;
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java Thu Mar 31 22:23:22 2011
@@ -22,15 +22,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
@@ -48,9 +48,9 @@ public class LegacyTaskRuntimeEstimator
String stateString = status.stateString.toString();
- TaskAttemptID attemptID = status.id;
- TaskID taskID = attemptID.taskID;
- JobID jobID = taskID.jobID;
+ TaskAttemptId attemptID = status.id;
+ TaskId taskID = attemptID.getTaskId();
+ JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
@@ -121,9 +121,9 @@ public class LegacyTaskRuntimeEstimator
}
private long storedPerAttemptValue
- (Map<TaskAttempt, AtomicLong> data, TaskAttemptID attemptID) {
- TaskID taskID = attemptID.taskID;
- JobID jobID = taskID.jobID;
+ (Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
+ TaskId taskID = attemptID.getTaskId();
+ JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
@@ -145,12 +145,12 @@ public class LegacyTaskRuntimeEstimator
}
@Override
- public long estimatedRuntime(TaskAttemptID attemptID) {
+ public long estimatedRuntime(TaskAttemptId attemptID) {
return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
}
@Override
- public long runtimeEstimateVariance(TaskAttemptID attemptID) {
+ public long runtimeEstimateVariance(TaskAttemptId attemptID) {
return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java Thu Mar 31 22:23:22 2011
@@ -19,10 +19,10 @@
package org.apache.hadoop.mapreduce.v2.app.speculate;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
/*
@@ -36,7 +36,7 @@ public class NullTaskRuntimesEngine impl
}
@Override
- public long attemptEnrolledTime(TaskAttemptID attemptID) {
+ public long attemptEnrolledTime(TaskAttemptId attemptID) {
return Long.MAX_VALUE;
}
@@ -51,21 +51,21 @@ public class NullTaskRuntimesEngine impl
}
@Override
- public long thresholdRuntime(TaskID id) {
+ public long thresholdRuntime(TaskId id) {
return Long.MAX_VALUE;
}
@Override
- public long estimatedRuntime(TaskAttemptID id) {
+ public long estimatedRuntime(TaskAttemptId id) {
return -1L;
}
@Override
- public long estimatedNewAttemptRuntime(TaskID id) {
+ public long estimatedNewAttemptRuntime(TaskId id) {
return -1L;
}
@Override
- public long runtimeEstimateVariance(TaskAttemptID id) {
+ public long runtimeEstimateVariance(TaskAttemptId id) {
return -1L;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java Thu Mar 31 22:23:22 2011
@@ -20,8 +20,8 @@ package org.apache.hadoop.mapreduce.v2.a
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
@@ -29,7 +29,7 @@ public class SpeculatorEvent extends Abs
private TaskAttemptStatus reportedStatus;
// valid for TASK_CONTAINER_NEED_UPDATE
- private TaskID taskID;
+ private TaskId taskID;
private int containersNeededChange;
@@ -39,7 +39,7 @@ public class SpeculatorEvent extends Abs
this.reportedStatus = reportedStatus;
}
- public SpeculatorEvent(TaskAttemptID attemptID, boolean flag, long timestamp) {
+ public SpeculatorEvent(TaskAttemptId attemptID, boolean flag, long timestamp) {
super(Speculator.EventType.ATTEMPT_START, timestamp);
this.reportedStatus = new TaskAttemptStatus();
this.reportedStatus.id = attemptID;
@@ -54,7 +54,7 @@ public class SpeculatorEvent extends Abs
* speculation wouldn't compete for containers with tasks which need
* to be run.
*/
- public SpeculatorEvent(TaskID taskID, int containersNeededChange) {
+ public SpeculatorEvent(TaskId taskID, int containersNeededChange) {
super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
this.taskID = taskID;
this.containersNeededChange = containersNeededChange;
@@ -68,7 +68,7 @@ public class SpeculatorEvent extends Abs
return containersNeededChange;
}
- public TaskID getTaskID() {
+ public TaskId getTaskID() {
return taskID;
}
}
\ No newline at end of file
|