hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1087462 [3/20] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/h...
Date Thu, 31 Mar 2011 22:23:34 GMT
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar 31 22:23:22 2011
@@ -83,32 +83,33 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ContainerBuilderHelper;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.URL;
-import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.Phase;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 
 /**
  * Implementation of TaskAttempt interface.
@@ -119,18 +120,19 @@ public abstract class TaskAttemptImpl im
       EventHandler<TaskAttemptEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   protected final Configuration conf;
   protected final Path jobFile;
   protected final int partition;
   protected final EventHandler eventHandler;
-  private final TaskAttemptID attemptId;
+  private final TaskAttemptId attemptId;
   private final org.apache.hadoop.mapred.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
   private final OutputCommitter committer;
   private final Resource resourceCapability;
   private final String[] dataLocalHosts;
-  private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+  private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
   private Collection<Token<? extends TokenIdentifier>> fsTokens;
@@ -360,7 +362,7 @@ public abstract class TaskAttemptImpl im
          <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
     stateMachine;
 
-  private ContainerID containerID;
+  private ContainerId containerID;
   private String containerMgrAddress;
   private WrappedJvmID jvmID;
   private ContainerToken containerToken;
@@ -372,16 +374,16 @@ public abstract class TaskAttemptImpl im
   //this is the last status reported by the REMOTE running attempt
   private TaskAttemptStatus reportedStatus;
 
-  public TaskAttemptImpl(TaskID taskId, int i, EventHandler eventHandler,
+  public TaskAttemptImpl(TaskId taskId, int i, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
       Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
       Collection<Token<? extends TokenIdentifier>> fsTokens) {
-    oldJobId = TypeConverter.fromYarn(taskId.jobID);
+    oldJobId = TypeConverter.fromYarn(taskId.getJobId());
     this.conf = conf;
-    attemptId = new TaskAttemptID();
-    attemptId.taskID = taskId;
-    attemptId.id = i;
+    attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
+    attemptId.setTaskId(taskId);
+    attemptId.setId(i);
     this.taskAttemptListener = taskAttemptListener;
 
     // Initialize reportedStatus
@@ -400,8 +402,8 @@ public abstract class TaskAttemptImpl im
     this.partition = partition;
 
     //TODO:create the resource reqt for this Task attempt
-    this.resourceCapability = new Resource();
-    this.resourceCapability.memory =  getMemoryRequired(conf, taskId.taskType);
+    this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
+    this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
     this.dataLocalHosts = dataLocalHosts;
 
     // This "this leak" is okay because the retained pointer is in an
@@ -424,36 +426,31 @@ public abstract class TaskAttemptImpl im
       LocalResourceType type, LocalResourceVisibility visibility) 
   throws IOException {
     FileStatus fstat = fc.getFileStatus(file);
-    LocalResource resource = new LocalResource();
-    resource.resource = AvroUtil.getYarnUrlFromPath(fstat.getPath());
-    resource.type = type;
-    resource.state = visibility;
-    resource.size = fstat.getLen();
-    resource.timestamp = fstat.getModificationTime();
+    LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
+    resource.setResource(ConverterUtils.getYarnUrlFromPath(fstat.getPath()));
+    resource.setType(type);
+    resource.setVisibility(visibility);
+    resource.setSize(fstat.getLen());
+    resource.setTimestamp(fstat.getModificationTime());
     return resource;
   }
   
   private ContainerLaunchContext getContainer() {
 
-    ContainerLaunchContext container = new ContainerLaunchContext();
-    container.serviceData = new HashMap<CharSequence, ByteBuffer>();
-    container.resources = new HashMap<CharSequence, LocalResource>();
-    container.env = new HashMap<CharSequence, CharSequence>();
-    
+    ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
     try {
       FileContext remoteFS = FileContext.getFileContext(conf);
       
       Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
       remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
-      URL jobConfFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(localizedJobConf);
+      URL jobConfFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(localizedJobConf);
       LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
       
       Path jobJar = remoteFS.makeQualified(new Path(remoteTask.getConf().get(MRJobConfig.JAR)));
-      URL jobJarFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(jobJar);
-      container.resources.put(YARNApplicationConstants.JOB_JAR,
-          getLocalResource(remoteFS, jobJar, 
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      URL jobJarFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(jobJar);
+      container.setLocalResource(YARNApplicationConstants.JOB_JAR, getLocalResource(remoteFS, jobJar, 
+          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
       LOG.info("The job-jar file on the remote FS is " + jobJarFileOnRemoteFS);
 
       Path jobSubmitDir =
@@ -463,13 +460,14 @@ public abstract class TaskAttemptImpl im
           remoteFS.makeQualified(new Path(jobSubmitDir,
               YarnConfiguration.APPLICATION_TOKENS_FILE));
       URL applicationTokenFileOnRemoteFS =
-          AvroUtil.getYarnUrlFromPath(jobTokenFile);
+          ConverterUtils.getYarnUrlFromPath(jobTokenFile);
       // TODO: Looks like this is not needed. Revisit during localization
       // cleanup.
       //container.resources_todo.put(YarnConfiguration.APPLICATION_TOKENS_FILE,
       //    getLocalResource(remoteFS, jobTokenFile, 
       //        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      container.resources.put(YARNApplicationConstants.JOB_CONF_FILE,
+      
+      container.setLocalResource(YARNApplicationConstants.JOB_CONF_FILE,
           getLocalResource(remoteFS,
             new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE),
             LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
@@ -499,25 +497,27 @@ public abstract class TaskAttemptImpl im
       LOG.info("Size of containertokens_dob is "
           + taskCredentials.numberOfTokens());
       taskCredentials.writeTokenStorageToStream(containerTokens_dob);
-      container.containerTokens =
+      container.setContainerTokens(
           ByteBuffer.wrap(containerTokens_dob.getData(), 0,
-              containerTokens_dob.getLength());
+              containerTokens_dob.getLength()));
 
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
       DataOutputBuffer jobToken_dob = new DataOutputBuffer();
       jobToken.write(jobToken_dob);
       // TODO: should depend on ShuffleHandler
-      container.serviceData.put("mapreduce.shuffle", 
+      container.setServiceData("mapreduce.shuffle", 
           ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()));
 
-      MRApps.setInitialClasspath(container.env);
+      Map<String, String> env = new HashMap<String, String>();
+      MRApps.setInitialClasspath(env);
+      container.addAllEnv(env);
     } catch (IOException e) {
       throw new YarnException(e);
     }
     
-    container.id = containerID;
-    container.user = conf.get(MRJobConfig.USER_NAME); // TODO: Fix
+    container.setContainerId(containerID);
+    container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix
 
     File workDir = new File(ContainerBuilderHelper.getWorkDir());
     String logDir = new File(workDir, "logs").toString();
@@ -535,18 +535,17 @@ public abstract class TaskAttemptImpl im
     classPaths.add(workDir.toString()); // TODO
 
     // Construct the actual Container
-    container.command =
-        MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
-            remoteTask, javaHome, workDir.toString(), logDir, 
-            childTmpDir, jvmID);
+    container.addAllCommands(MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
+        remoteTask, javaHome, workDir.toString(), logDir, 
+        childTmpDir, jvmID));
 
-    MapReduceChildJVM.setVMEnv(container.env, classPaths, workDir.toString(),
+    MapReduceChildJVM.setVMEnv(container.getAllEnv(), classPaths, workDir.toString(),
         nmLdLibraryPath, remoteTask, localizedApplicationTokensFile);
 
     // Construct the actual Container
-    container.id = containerID;
-    container.user = conf.get(MRJobConfig.USER_NAME);
-    container.resource = resourceCapability;
+    container.setContainerId(containerID);
+    container.setUser(conf.get(MRJobConfig.USER_NAME));
+    container.setResource(resourceCapability);
     return container;
   }
 
@@ -606,7 +605,7 @@ public abstract class TaskAttemptImpl im
         if (name.isAbsolute()) {
           throw new IllegalArgumentException("Resource name must be relative");
         }
-        container.resources.put(
+        container.setLocalResource(
             name.toUri().getPath(),
             BuilderUtils.newLocalResource(
                 uris[i], type, 
@@ -625,13 +624,13 @@ public abstract class TaskAttemptImpl im
   private static final String CLASSPATH = "CLASSPATH";
   private static void addCacheArtifactToClassPath(
       ContainerLaunchContext container, String fileName) {
-    CharSequence classpath = container.env.get(CLASSPATH);
+    String classpath = container.getEnv(CLASSPATH);
     if (classpath == null) {
       classpath = fileName;
     } else {
       classpath = classpath + ":" + fileName;
     }
-    container.env.put(CLASSPATH, classpath);
+    container.setEnv(CLASSPATH, classpath);
   }
   
   // TODO - Move this to MR!
@@ -648,7 +647,7 @@ public abstract class TaskAttemptImpl im
   }
   
   @Override
-  public ContainerID getAssignedContainerID() {
+  public ContainerId getAssignedContainerID() {
     readLock.lock();
     try {
       return containerID;
@@ -694,7 +693,7 @@ public abstract class TaskAttemptImpl im
   protected abstract int getPriority();
 
   @Override
-  public TaskAttemptID getID() {
+  public TaskAttemptId getID() {
     return attemptId;
   }
 
@@ -713,20 +712,21 @@ public abstract class TaskAttemptImpl im
 
   @Override
   public TaskAttemptReport getReport() {
-    TaskAttemptReport result = new TaskAttemptReport();
+    TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class);
     readLock.lock();
     try {
-      result.id = attemptId;
+      result.setTaskAttemptId(attemptId);
       //take the LOCAL state of attempt
       //DO NOT take from reportedStatus
-      result.state = getState();
-      result.progress = reportedStatus.progress;
-      result.startTime = launchTime;
-      result.finishTime = finishTime;
-      result.diagnosticInfo = reportedStatus.diagnosticInfo;
-      result.phase = reportedStatus.phase;
-      result.stateString = reportedStatus.stateString;
-      result.counters = getCounters();
+      
+      result.setTaskAttemptState(getState());
+      result.setProgress(reportedStatus.progress);
+      result.setStartTime(launchTime);
+      result.setFinishTime(finishTime);
+      result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
+      result.setPhase(reportedStatus.phase);
+      result.setStateString(reportedStatus.stateString);
+      result.setCounters(getCounters());
       return result;
     } finally {
       readLock.unlock();
@@ -734,8 +734,8 @@ public abstract class TaskAttemptImpl im
   }
 
   @Override
-  public List<CharSequence> getDiagnostics() {
-    List<CharSequence> result = new ArrayList<CharSequence>();
+  public List<String> getDiagnostics() {
+    List<String> result = new ArrayList<String>();
     readLock.lock();
     try {
       result.addAll(diagnostics);
@@ -751,8 +751,8 @@ public abstract class TaskAttemptImpl im
     try {
       Counters counters = reportedStatus.counters;
       if (counters == null) {
-        counters = new Counters();
-        counters.groups = new HashMap<CharSequence, CounterGroup>();
+        counters = recordFactory.newRecordInstance(Counters.class);
+//        counters.groups = new HashMap<String, CounterGroup>();
       }
       return counters;
     } finally {
@@ -792,9 +792,9 @@ public abstract class TaskAttemptImpl im
       } catch (InvalidStateTransitonException e) {
         LOG.error("Can't handle this event at current state", e);
         eventHandler.handle(new JobDiagnosticsUpdateEvent(
-            this.attemptId.taskID.jobID, "Invalid event " + event.getType() + 
+            this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() + 
             " on TaskAttempt " + this.attemptId));
-        eventHandler.handle(new JobEvent(this.attemptId.taskID.jobID,
+        eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
             JobEventType.INTERNAL_ERROR));
       }
       if (oldState != getState()) {
@@ -823,7 +823,7 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       // Tell any speculator that we're requesting a container
       taskAttempt.eventHandler.handle
-          (new SpeculatorEvent(taskAttempt.getID().taskID, +1));
+          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
       //request for container
       taskAttempt.eventHandler.handle(
           new ContainerRequestEvent(taskAttempt.attemptId, 
@@ -845,7 +845,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.remoteTask = taskAttempt.createRemoteTask();
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
-          taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.id);
+          taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
       
       //launch the container
       //create the container object to be launched for a given Task attempt
@@ -865,7 +865,7 @@ public abstract class TaskAttemptImpl im
 
       // send event to speculator that our container needs are satisfied
       taskAttempt.eventHandler.handle
-          (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
     }
   }
 
@@ -892,7 +892,7 @@ public abstract class TaskAttemptImpl im
       //  we're transitioning out of UNASSIGNED
       if (withdrawsContainerRequest) {
         taskAttempt.eventHandler.handle
-            (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+            (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
       }
 
       switch(finalState) {
@@ -923,11 +923,11 @@ public abstract class TaskAttemptImpl im
           taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
       TaskAttemptStartedEvent tase =
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             taskAttempt.launchTime,
             "tracker", 0);
       taskAttempt.eventHandler.handle
-          (new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tase));
+          (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
       taskAttempt.eventHandler.handle
           (new SpeculatorEvent
               (taskAttempt.attemptId, true, System.currentTimeMillis()));
@@ -976,23 +976,23 @@ public abstract class TaskAttemptImpl im
       //set the finish time
       taskAttempt.setFinishTime();
       String taskType = 
-          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType).toString();
+          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
       LOG.info("In TaskAttemptImpl taskType: " + taskType);
       if (taskType.equals("MAP")) {
           MapAttemptFinishedEvent mfe =
             new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             TaskAttemptState.SUCCEEDED.toString(),
             taskAttempt.finishTime,
             taskAttempt.finishTime, "hostname",
             TaskAttemptState.SUCCEEDED.toString(),
             TypeConverter.fromYarn(taskAttempt.getCounters()),null);
             taskAttempt.eventHandler.handle(
-              new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+              new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
       } else {
           ReduceAttemptFinishedEvent rfe =
             new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             TaskAttemptState.SUCCEEDED.toString(),
             taskAttempt.finishTime,
             taskAttempt.finishTime,
@@ -1000,7 +1000,7 @@ public abstract class TaskAttemptImpl im
             TaskAttemptState.SUCCEEDED.toString(),
             TypeConverter.fromYarn(taskAttempt.getCounters()),null);
             taskAttempt.eventHandler.handle(
-              new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+              new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
       }
           /*
       TaskAttemptFinishedEvent tfe =
@@ -1029,28 +1029,28 @@ public abstract class TaskAttemptImpl im
       TaskAttemptUnsuccessfulCompletionEvent ta =
           new TaskAttemptUnsuccessfulCompletionEvent(
           TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
           TaskAttemptState.FAILED.toString(),
           taskAttempt.finishTime,
           "hostname",
           taskAttempt.reportedStatus.diagnosticInfo.toString());
       taskAttempt.eventHandler.handle(
-          new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, ta));
-      if (taskAttempt.attemptId.taskID.taskType == TaskType.MAP) {
+          new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta));
+      if (taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP) {
         MapAttemptFinishedEvent mfe =
            new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-           TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
            TaskAttemptState.FAILED.toString(),
            taskAttempt.finishTime,
            taskAttempt.finishTime, "hostname",
            TaskAttemptState.FAILED.toString(),
            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
            taskAttempt.eventHandler.handle(
-             new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+             new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe));
       } else {
          ReduceAttemptFinishedEvent rfe =
            new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-           TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
            TaskAttemptState.FAILED.toString(),
            taskAttempt.finishTime,
            taskAttempt.finishTime,
@@ -1058,7 +1058,7 @@ public abstract class TaskAttemptImpl im
            TaskAttemptState.FAILED.toString(),
            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
            taskAttempt.eventHandler.handle(
-             new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+             new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe));
       }
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1090,13 +1090,13 @@ public abstract class TaskAttemptImpl im
       TaskAttemptUnsuccessfulCompletionEvent tke =
           new TaskAttemptUnsuccessfulCompletionEvent(
           TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
           TaskAttemptState.KILLED.toString(),
           taskAttempt.finishTime,
           TaskAttemptState.KILLED.toString(),
           taskAttempt.reportedStatus.diagnosticInfo.toString());
       taskAttempt.eventHandler.handle(
-          new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tke));
+          new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke));
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));
@@ -1121,7 +1121,7 @@ public abstract class TaskAttemptImpl im
     }
   }
 
-  private void addDiagnosticInfo(CharSequence diag) {
+  private void addDiagnosticInfo(String diag) {
     if (diag != null && !diag.equals("")) {
       diagnostics.add(diag);
     }
@@ -1175,8 +1175,8 @@ public abstract class TaskAttemptImpl im
     result.diagnosticInfo = new String("");
     result.phase = Phase.STARTING;
     result.stateString = new String("NEW");
-    Counters counters = new Counters();
-    counters.groups = new HashMap<CharSequence, CounterGroup>();
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+//    counters.groups = new HashMap<String, CounterGroup>();
     result.counters = counters;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar 31 22:23:22 2011
@@ -40,6 +40,16 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -57,21 +67,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
 
 /**
  * Implementation of Task interface.
@@ -86,11 +88,14 @@ public abstract class TaskImpl implement
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final EventHandler eventHandler;
-  private final TaskID taskId;
-  private Map<TaskAttemptID, TaskAttempt> attempts;
+  private final TaskId taskId;
+  private Map<TaskAttemptId, TaskAttempt> attempts;
   private final int maxAttempts;
   private final Lock readLock;
   private final Lock writeLock;
+  
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  
   protected Collection<Token<? extends TokenIdentifier>> fsTokens;
   protected Token<JobTokenIdentifier> jobToken;
   
@@ -198,9 +203,9 @@ public abstract class TaskImpl implement
 
   //should be set to one which comes first
   //saying COMMIT_PENDING
-  private TaskAttemptID commitAttempt;
+  private TaskAttemptId commitAttempt;
 
-  private TaskAttemptID successfulAttempt;
+  private TaskAttemptId successfulAttempt;
 
   private int failedAttempts;
   private int finishedAttempts;//finish are total of success, failed and killed
@@ -210,7 +215,7 @@ public abstract class TaskImpl implement
     return stateMachine.getCurrentState();
   }
 
-  public TaskImpl(JobID jobId, TaskType taskType, int partition,
+  public TaskImpl(JobId jobId, TaskType taskType, int partition,
       EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
       TaskAttemptListener taskAttemptListener, OutputCommitter committer,
       Token<JobTokenIdentifier> jobToken,
@@ -225,10 +230,10 @@ public abstract class TaskImpl implement
     //  have a convention that none of the overrides depends on any
     //  fields that need initialization.
     maxAttempts = getMaxAttempts();
-    taskId = new TaskID();
-    taskId.jobID = jobId;
-    taskId.id = partition;
-    taskId.taskType = taskType;
+    taskId = recordFactory.newRecordInstance(TaskId.class);
+    taskId.setJobId(jobId);
+    taskId.setId(partition);
+    taskId.setTaskType(taskType);
     this.partition = partition;
     this.taskAttemptListener = taskAttemptListener;
     this.eventHandler = eventHandler;
@@ -242,7 +247,7 @@ public abstract class TaskImpl implement
   }
 
   @Override
-  public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+  public Map<TaskAttemptId, TaskAttempt> getAttempts() {
     readLock.lock();
 
     try {
@@ -250,8 +255,8 @@ public abstract class TaskImpl implement
         return attempts;
       }
       
-      Map<TaskAttemptID, TaskAttempt> result
-          = new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+      Map<TaskAttemptId, TaskAttempt> result
+          = new LinkedHashMap<TaskAttemptId, TaskAttempt>();
       result.putAll(attempts);
 
       return result;
@@ -261,7 +266,7 @@ public abstract class TaskImpl implement
   }
 
   @Override
-  public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+  public TaskAttempt getAttempt(TaskAttemptId attemptID) {
     readLock.lock();
     try {
       return attempts.get(attemptID);
@@ -271,7 +276,7 @@ public abstract class TaskImpl implement
   }
 
   @Override
-  public TaskID getID() {
+  public TaskId getID() {
     return taskId;
   }
 
@@ -290,24 +295,24 @@ public abstract class TaskImpl implement
 
   @Override
   public TaskReport getReport() {
-    TaskReport report = new TaskReport();
+    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
     readLock.lock();
     try {
-      report.id = taskId;
-      report.startTime = getLaunchTime();
-      report.finishTime = getFinishTime();
-      report.state = getState();
-      report.progress = getProgress();
-      report.counters = getCounters();
-      report.runningAttempts = new ArrayList<TaskAttemptID>();
-      report.runningAttempts.addAll(attempts.keySet());
-      report.successfulAttempt = successfulAttempt;
+      report.setTaskId(taskId);
+      report.setStartTime(getLaunchTime());
+      report.setFinishTime(getFinishTime());
+      report.setTaskState(getState());
+      report.setProgress(getProgress());
+      report.setCounters(getCounters());
+      
+      report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
+      report.setSuccessfulAttempt(successfulAttempt);
       
-      report.diagnostics = new ArrayList<CharSequence>();
       for (TaskAttempt att : attempts.values()) {
         String prefix = "AttemptID:" + att.getID() + " Info:";
         for (CharSequence cs : att.getDiagnostics()) {
-          report.diagnostics.add(prefix + cs);
+          report.addDiagnostics(prefix + cs);
+          
         }
       }
       return report;
@@ -325,8 +330,8 @@ public abstract class TaskImpl implement
       if (bestAttempt != null) {
         counters = bestAttempt.getCounters();
       } else {
-        counters = new Counters();
-        counters.groups = new HashMap<CharSequence, CounterGroup>();
+        counters = recordFactory.newRecordInstance(Counters.class);
+//        counters.groups = new HashMap<CharSequence, CounterGroup>();
       }
       return counters;
     } finally {
@@ -395,7 +400,7 @@ public abstract class TaskImpl implement
   }
 
   @Override
-  public boolean canCommit(TaskAttemptID taskAttemptID) {
+  public boolean canCommit(TaskAttemptId taskAttemptID) {
     readLock.lock();
     boolean canCommit = false;
     try {
@@ -437,7 +442,7 @@ public abstract class TaskImpl implement
         
       case 1:
         Map newAttempts
-            = new LinkedHashMap<TaskAttemptID, TaskAttempt>(maxAttempts);
+            = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
         newAttempts.putAll(attempts);
         attempts = newAttempts;
         attempts.put(attempt.getID(), attempt);
@@ -478,29 +483,27 @@ public abstract class TaskImpl implement
 
   private void internalError(TaskEventType type) {
     eventHandler.handle(new JobDiagnosticsUpdateEvent(
-        this.taskId.jobID, "Invalid event " + type + 
+        this.taskId.getJobId(), "Invalid event " + type + 
         " on Task " + this.taskId));
-    eventHandler.handle(new JobEvent(this.taskId.jobID,
+    eventHandler.handle(new JobEvent(this.taskId.getJobId(),
         JobEventType.INTERNAL_ERROR));
   }
 
   // always called inside a transition, in turn inside the Write Lock
-  private void handleTaskAttemptCompletion(TaskAttemptID attemptId,
+  private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
       TaskAttemptCompletionEventStatus status) {
     finishedAttempts++;
     TaskAttempt attempt = attempts.get(attemptId);
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
     if (attempt.getAssignedContainerMgrAddress() != null) {
-      TaskAttemptCompletionEvent tce = new TaskAttemptCompletionEvent();
-      tce.eventId = -1;
+      TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+      tce.setEventId(-1);
       //TODO: XXXXXX  hardcoded port
-      tce.mapOutputServerAddress =
-        "http://" + attempt.getAssignedContainerMgrAddress().split(":")[0]
-                                                              + ":8080";
-      tce.status = status;
-      tce.attemptId = attempt.getID();
-      tce.attemptRunTime = 0; // TODO: set the exact run time of the task.
+      tce.setMapOutputServerAddress("http://" + attempt.getAssignedContainerMgrAddress().split(":")[0] + ":8080");
+      tce.setStatus(status);
+      tce.setAttemptId(attempt.getID());
+      tce.setAttemptRunTime(0); // TODO: set the exact run time of the task.
       
       //raise the event to job so that it adds the completion event to its
       //data structures
@@ -515,8 +518,8 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       TaskStartedEvent tse = new TaskStartedEvent(TypeConverter
           .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
-          .fromYarn(task.taskId.taskType), TaskState.RUNNING.toString());
-      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tse));
+          .fromYarn(task.taskId.getTaskType()), TaskState.RUNNING.toString());
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
 
       task.addAndScheduleAttempt();
     }
@@ -543,7 +546,7 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
       // The nextAttemptNumber is commit pending, decide on set the commitAttempt
-      TaskAttemptID attemptID = ev.getTaskAttemptID();
+      TaskAttemptId attemptID = ev.getTaskAttemptID();
       if (task.commitAttempt == null) {
         // TODO: validate attemptID
         task.commitAttempt = attemptID;
@@ -576,10 +579,10 @@ public abstract class TaskImpl implement
       TaskFinishedEvent tfe =
           new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
             task.getFinishTime(),
-            TypeConverter.fromYarn(task.taskId.taskType),
+            TypeConverter.fromYarn(task.taskId.getTaskType()),
             TaskState.SUCCEEDED.toString(),
             TypeConverter.fromYarn(task.getCounters()));
-      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
             // This is okay because it can only talk us out of sending a
@@ -654,10 +657,10 @@ public abstract class TaskImpl implement
         TaskFinishedEvent tfi =
             new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
                 task.getFinishTime(),
-            TypeConverter.fromYarn(task.taskId.taskType),
+            TypeConverter.fromYarn(task.taskId.getTaskType()),
                 TaskState.FAILED.toString(),
                 TypeConverter.fromYarn(task.getCounters()));
-        task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfi));
+        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi));
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
         return TaskState.FAILED;
@@ -710,10 +713,10 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       TaskFinishedEvent tfe =
           new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
-              task.getFinishTime(), TypeConverter.fromYarn(task.taskId.taskType),
+              task.getFinishTime(), TypeConverter.fromYarn(task.taskId.getTaskType()),
               TaskState.KILLED.toString(), TypeConverter.fromYarn(task
                   .getCounters()));
-      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
       task.eventHandler.handle(
           new JobTaskEvent(task.taskId, TaskState.KILLED));
     }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Thu Mar 31 22:23:22 2011
@@ -18,21 +18,21 @@
 
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 
 public class ContainerLauncherEvent 
     extends AbstractEvent<ContainerLauncher.EventType> {
 
-  private TaskAttemptID taskAttemptID;
-  private ContainerID containerID;
+  private TaskAttemptId taskAttemptID;
+  private ContainerId containerID;
   private String containerMgrAddress;
   private ContainerToken containerToken;
 
-  public ContainerLauncherEvent(TaskAttemptID taskAttemptID, 
-      ContainerID containerID,
+  public ContainerLauncherEvent(TaskAttemptId taskAttemptID, 
+      ContainerId containerID,
       String containerMgrAddress,
       ContainerToken containerToken,
       ContainerLauncher.EventType type) {
@@ -43,11 +43,11 @@ public class ContainerLauncherEvent 
     this.containerToken = containerToken;
   }
 
-  public TaskAttemptID getTaskAttemptID() {
+  public TaskAttemptId getTaskAttemptID() {
     return this.taskAttemptID;
   }
 
-  public ContainerID getContainerID() {
+  public ContainerId getContainerID() {
     return containerID;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Mar 31 22:23:22 2011
@@ -41,14 +41,18 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerToken;
 
 /**
  * This class is responsible for launching of containers.
@@ -117,7 +121,7 @@ public class ContainerLauncherImpl exten
     super.stop();
   }
 
-  protected ContainerManager getCMProxy(ContainerID containerID,
+  protected ContainerManager getCMProxy(ContainerId containerID,
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
@@ -125,10 +129,10 @@ public class ContainerLauncherImpl exten
     if (UserGroupInformation.isSecurityEnabled()) {
       Token<ContainerTokenIdentifier> token =
           new Token<ContainerTokenIdentifier>(
-              containerToken.identifier.array(),
-              containerToken.password.array(), new Text(
-                  containerToken.kind.toString()), new Text(
-                  containerToken.service.toString()));
+              containerToken.getIdentifier().array(),
+              containerToken.getPassword().array(), new Text(
+                  containerToken.getKind()), new Text(
+                  containerToken.getService()));
       user.addToken(token);
     }
     ContainerManager proxy =
@@ -161,7 +165,7 @@ public class ContainerLauncherImpl exten
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.
       final String containerManagerBindAddr = event.getContainerMgrAddress();
-      ContainerID containerID = event.getContainerID();
+      ContainerId containerID = event.getContainerID();
       ContainerToken containerToken = event.getContainerToken();
 
       switch(event.getType()) {
@@ -181,7 +185,9 @@ public class ContainerLauncherImpl exten
           // TODO: Make sure that child's mapred-local-dir is set correctly.
 
           // Now launch the actual container
-          proxy.startContainer(containerLaunchContext);
+          StartContainerRequest startRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StartContainerRequest.class);
+          startRequest.setContainerLaunchContext(containerLaunchContext);
+          proxy.startContainer(startRequest);
 
           // after launching send launched event to taskattempt
           context.getEventHandler().handle(
@@ -201,7 +207,7 @@ public class ContainerLauncherImpl exten
         // and not yet processed
         if (eventQueue.contains(event)) {
           eventQueue.remove(event); // TODO: Any synchro needed?
-          // TODO: raise any event?
+          // k: raise any event?
         } else {
           try {
             ContainerManager proxy = 
@@ -209,8 +215,13 @@ public class ContainerLauncherImpl exten
             // TODO:check whether container is launched
 
             // kill the remote container if already launched
-            proxy.stopContainer(event.getContainerID());
-            proxy.cleanupContainer(event.getContainerID());
+            StopContainerRequest stopRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(StopContainerRequest.class);
+            stopRequest.setContainerId(event.getContainerID());
+            proxy.stopContainer(stopRequest);
+            
+            CleanupContainerRequest cleanupRequest = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(CleanupContainerRequest.class);
+            cleanupRequest.setContainerId(event.getContainerID());
+            proxy.cleanupContainer(cleanupRequest);
           } catch (Exception e) {
             //ignore the cleanup failure
             LOG.warn("cleanup failed for container " + event.getContainerID() ,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Thu Mar 31 22:23:22 2011
@@ -19,15 +19,15 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
 
 public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
 
-  public ContainerRemoteLaunchEvent(TaskAttemptID taskAttemptID,
-      ContainerID containerID, String containerMgrAddress,
+  public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
+      ContainerId containerID, String containerMgrAddress,
       ContainerToken containerToken) {
     super(taskAttemptID, containerID, containerMgrAddress,
         containerToken,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocatorEvent.java Thu Mar 31 22:23:22 2011
@@ -19,20 +19,20 @@
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 
 public class ContainerAllocatorEvent extends 
     AbstractEvent<ContainerAllocator.EventType> {
   
-  private TaskAttemptID attemptID;
+  private TaskAttemptId attemptID;
 
-  public ContainerAllocatorEvent(TaskAttemptID attemptID,
+  public ContainerAllocatorEvent(TaskAttemptId attemptID,
       ContainerAllocator.EventType type) {
     super(type);
     this.attemptID = attemptID;
   }
 
-  public TaskAttemptID getAttemptID() {
+  public TaskAttemptId getAttemptID() {
     return attemptID;
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Thu Mar 31 22:23:22 2011
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
 
 public class ContainerRequestEvent extends ContainerAllocatorEvent {
   
@@ -29,13 +31,13 @@ public class ContainerRequestEvent exten
   private String[] hosts;
   private String[] racks;
 
-  public ContainerRequestEvent(TaskAttemptID attemptID, 
+  public ContainerRequestEvent(TaskAttemptId attemptID, 
       Resource capability, int priority,
       String[] hosts, String[] racks) {
     super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ);
     this.capability = capability;
-    this.priority = new Priority();
-    this.priority.priority = priority;
+    this.priority = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
+    this.priority.setPriority(priority);
     this.hosts = hosts;
     this.racks = racks;
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Mar 31 22:23:22 2011
@@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -46,24 +47,31 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.AMRMProtocol;
-import org.apache.hadoop.yarn.AMResponse;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.Priority;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceRequest;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+
 
 /**
  * Allocates the container from the ResourceManager scheduler.
@@ -74,7 +82,7 @@ implements ContainerAllocator {
     LogFactory.getLog(RMContainerAllocator.class);
   private static final String ANY = "*";
   private static int rmPollInterval;//millis
-  private ApplicationID applicationId;
+  private ApplicationId applicationId;
   private EventHandler eventHandler;
   private volatile boolean stopped;
   protected Thread allocatorThread;
@@ -82,10 +90,12 @@ implements ContainerAllocator {
   private AMRMProtocol scheduler;
   private final ClientService clientService;
   private int lastResponseID = 0;
+  
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   //mapping for assigned containers
-  private final Map<ContainerID, TaskAttemptID> assignedMap = 
-    new HashMap<ContainerID, TaskAttemptID>();
+  private final Map<ContainerId, TaskAttemptId> assignedMap = 
+    new HashMap<ContainerId, TaskAttemptId>();
 
   private final Map<Priority, 
   Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue = 
@@ -110,7 +120,7 @@ implements ContainerAllocator {
     this.clientService = clientService;
     this.applicationId = context.getApplicationID();
     this.eventHandler = context.getEventHandler();
-    this.applicationMaster = new ApplicationMaster();
+    this.applicationMaster = recordFactory.newRecordInstance(ApplicationMaster.class);
   }
 
   @Override
@@ -130,17 +140,18 @@ implements ContainerAllocator {
 
   protected void register() {
     //Register
-    applicationMaster.applicationId = applicationId;
-    applicationMaster.host =
-      clientService.getBindAddress().getAddress().getHostAddress();
-    applicationMaster.rpcPort = clientService.getBindAddress().getPort();
-    applicationMaster.state = ApplicationState.RUNNING;
-    applicationMaster.httpPort = clientService.getHttpPort();
-    applicationMaster.status = new ApplicationStatus();
-    applicationMaster.status.applicationId = applicationId;
-    applicationMaster.status.progress = 0.0f;
+    applicationMaster.setApplicationId(applicationId);
+    applicationMaster.setHost(clientService.getBindAddress().getAddress().getHostAddress());
+    applicationMaster.setRpcPort(clientService.getBindAddress().getPort());
+    applicationMaster.setState(ApplicationState.RUNNING);
+    applicationMaster.setHttpPort(clientService.getHttpPort());
+    applicationMaster.setStatus(recordFactory.newRecordInstance(ApplicationStatus.class));
+    applicationMaster.getStatus().setApplicationId(applicationId);
+    applicationMaster.getStatus().setProgress(0.0f);
     try {
-      scheduler.registerApplicationMaster(applicationMaster);
+      RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
+      request.setApplicationMaster(applicationMaster);
+      scheduler.registerApplicationMaster(request);
     } catch(Exception are) {
       LOG.info("Exception while registering", are);
       throw new YarnException(are);
@@ -149,8 +160,10 @@ implements ContainerAllocator {
 
   protected void unregister() {
     try {
-      applicationMaster.state = ApplicationState.COMPLETED;
-      scheduler.finishApplicationMaster(applicationMaster);
+      applicationMaster.setState(ApplicationState.COMPLETED);
+      FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
+      request.setApplicationMaster(applicationMaster);
+      scheduler.finishApplicationMaster(request);
     } catch(Exception are) {
       LOG.info("Error while unregistering ", are);
     }
@@ -300,20 +313,20 @@ implements ContainerAllocator {
     }
     ResourceRequest remoteRequest = reqMap.get(capability);
     if (remoteRequest == null) {
-      remoteRequest = new ResourceRequest();
-      remoteRequest.priority = priority;
-      remoteRequest.hostName = resourceName;
-      remoteRequest.capability = capability;
-      remoteRequest.numContainers = 0;
+      remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
+      remoteRequest.setPriority(priority);
+      remoteRequest.setHostName(resourceName);
+      remoteRequest.setCapability(capability);
+      remoteRequest.setNumContainers(0);
       reqMap.put(capability, remoteRequest);
     }
-    remoteRequest.numContainers++;
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
 
     // Note this down for next interaction with ResourceManager
     ask.add(remoteRequest);
-    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.id
-        + " priority=" + priority.priority + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.numContainers + " #asks="
+    LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
         + ask.size());
   }
 
@@ -324,13 +337,13 @@ implements ContainerAllocator {
     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
     ResourceRequest remoteRequest = reqMap.get(capability);
 
-    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.id
-        + " priority=" + priority.priority + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.numContainers + " #asks="
+    LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
         + ask.size());
 
-    remoteRequest.numContainers--;
-    if (remoteRequest.numContainers == 0) {
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    if (remoteRequest.getNumContainers() == 0) {
       reqMap.remove(capability);
       if (reqMap.size() == 0) {
         remoteRequests.remove(resourceName);
@@ -345,21 +358,25 @@ implements ContainerAllocator {
       //already have it.
     }
 
-    LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.id
-        + " priority=" + priority.priority + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.numContainers + " #asks="
+    LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.getId()
+        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
+        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
         + ask.size());
   }
 
   private List<Container> getResources() throws Exception {
-    ApplicationStatus status = new ApplicationStatus();
-    status.applicationId = applicationId;
-    status.responseID = lastResponseID;
-    AMResponse response = 
-      scheduler.allocate(status, 
-          new ArrayList(ask), new ArrayList(release));
-    lastResponseID = response.responseId;
-    List<Container> allContainers = response.containers;
+    ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+    status.setApplicationId(applicationId);
+    status.setResponseId(lastResponseID);
+    
+    AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
+    allocateRequest.setApplicationStatus(status);
+    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
+    allocateRequest.addAllReleases(new ArrayList<Container>(release));
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response = allocateResponse.getAMResponse(); 
+    lastResponseID = response.getResponseId();
+    List<Container> allContainers = response.getContainerList();
     ask.clear();
     release.clear();
 
@@ -369,15 +386,15 @@ implements ContainerAllocator {
         " recieved=" + allContainers.size());
     List<Container> allocatedContainers = new ArrayList<Container>();
     for (Container cont : allContainers) {
-      if (cont.state != ContainerState.COMPLETE) {
+      if (cont.getState() != ContainerState.COMPLETE) {
         allocatedContainers.add(cont);
         LOG.debug("Received Container :" + cont);
       } else {
         LOG.info("Received completed container " + cont);
-        TaskAttemptID attemptID = assignedMap.remove(cont.id);
+        TaskAttemptId attemptID = assignedMap.remove(cont.getId());
         if (attemptID == null) {
           LOG.error("Container complete event for unknown container id " + 
-              cont.id);
+              cont.getId());
         } else {
           //send the container completed event to Task attempt
           eventHandler.handle(new TaskAttemptEvent(attemptID, 
@@ -413,8 +430,8 @@ implements ContainerAllocator {
   private void assign(Priority priority, List<Container> allocatedContainers) {
     for (Iterator<Container> i=allocatedContainers.iterator(); i.hasNext();) {
       Container allocatedContainer = i.next();
-      String host = allocatedContainer.hostName.toString();
-      Resource capability = allocatedContainer.resource;
+      String host = allocatedContainer.getHostName();
+      Resource capability = allocatedContainer.getResource();
 
       LinkedList<ContainerRequestEvent> requestList = 
         localRequestsQueue.get(priority).get(capability);
@@ -455,15 +472,15 @@ implements ContainerAllocator {
 
         //send the container assigned event to Task attempt
         eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned
-            .getAttemptID(), allocatedContainer.id,
-            allocatedContainer.hostName.toString(),
-            allocatedContainer.containerToken));
+            .getAttemptID(), allocatedContainer.getId(),
+            allocatedContainer.getHostName(),
+            allocatedContainer.getContainerToken()));
 
-        assignedMap.put(allocatedContainer.id, assigned.getAttemptID());
+        assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID());
 
         LOG.info("Assigned container (" + allocatedContainer + ") " +
             " to task " + assigned.getAttemptID() + " at priority " + priority + 
-            " on node " + allocatedContainer.hostName.toString());
+            " on node " + allocatedContainer.getHostName());
       }
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/StaticContainerAllocator.java Thu Mar 31 22:23:22 2011
@@ -33,9 +33,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.ContainerID;
 
 /**
  * Reads the static list of NodeManager from config file and allocate 
@@ -137,7 +138,7 @@ public class StaticContainerAllocator ex
       if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
         if (nextIndex < containerMgrList.size()) {
           String containerMgr = containerMgrList.get(nextIndex);
-          ContainerID containerID = generateContainerID();
+          ContainerId containerID = generateContainerID();
 
         context.getEventHandler().handle(
             new TaskAttemptContainerAssignedEvent(
@@ -147,10 +148,10 @@ public class StaticContainerAllocator ex
       }
     }
 
-    private ContainerID generateContainerID() {
-      ContainerID cId = new ContainerID();
-      cId.appID = context.getApplicationID();
-      cId.id = containerCount++;
+    private ContainerId generateContainerID() {
+      ContainerId cId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+      cId.setAppId(context.getApplicationID());
+      cId.setId(containerCount++);
       return cId;
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Mar 31 22:23:22 2011
@@ -33,9 +33,13 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.Clock;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -47,10 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
 
 public class DefaultSpeculator extends AbstractService implements
     Speculator {
@@ -71,8 +72,8 @@ public class DefaultSpeculator extends A
 
   private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
 
-  private final ConcurrentMap<TaskID, Boolean> runningTasks
-      = new ConcurrentHashMap<TaskID, Boolean>();
+  private final ConcurrentMap<TaskId, Boolean> runningTasks
+      = new ConcurrentHashMap<TaskId, Boolean>();
 
   private final Map<Task, AtomicBoolean> pendingSpeculations
       = new ConcurrentHashMap<Task, AtomicBoolean>();
@@ -80,12 +81,12 @@ public class DefaultSpeculator extends A
   // These are the current needs, not the initial needs.  For each job, these
   //  record the number of attempts that exist and that are actively
   //  waiting for a container [as opposed to running or finished]
-  private final ConcurrentMap<JobID, AtomicInteger> mapContainerNeeds
-      = new ConcurrentHashMap<JobID, AtomicInteger>();
-  private final ConcurrentMap<JobID, AtomicInteger> reduceContainerNeeds
-      = new ConcurrentHashMap<JobID, AtomicInteger>();
+  private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds
+      = new ConcurrentHashMap<JobId, AtomicInteger>();
+  private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
+      = new ConcurrentHashMap<JobId, AtomicInteger>();
 
-  private final Set<TaskID> mayHaveSpeculated = new HashSet();
+  private final Set<TaskId> mayHaveSpeculated = new HashSet();
 
   private final Configuration conf;
   private AppContext context;
@@ -239,11 +240,11 @@ public class DefaultSpeculator extends A
 
   // This section contains the code that gets run for a SpeculatorEvent
 
-  private AtomicInteger containerNeed(TaskID taskID) {
-    JobID jobID = taskID.jobID;
-    TaskType taskType = taskID.taskType;
+  private AtomicInteger containerNeed(TaskId taskID) {
+    JobId jobID = taskID.getJobId();
+    TaskType taskType = taskID.getTaskType();
 
-    ConcurrentMap<JobID, AtomicInteger> relevantMap
+    ConcurrentMap<JobId, AtomicInteger> relevantMap
         = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
 
     AtomicInteger result = relevantMap.get(jobID);
@@ -289,9 +290,9 @@ public class DefaultSpeculator extends A
 
     String stateString = reportedStatus.stateString.toString();
 
-    TaskAttemptID attemptID = reportedStatus.id;
-    TaskID taskID = attemptID.taskID;
-    Job job = context.getJob(taskID.jobID);
+    TaskAttemptId attemptID = reportedStatus.id;
+    TaskId taskID = attemptID.getTaskId();
+    Job job = context.getJob(taskID.getJobId());
 
     if (job == null) {
       return;
@@ -335,10 +336,10 @@ public class DefaultSpeculator extends A
   //
   // All of these values are negative.  Any value that should be allowed to
   //  speculate is 0 or positive.
-  private long speculationValue(TaskID taskID, long now) {
-    Job job = context.getJob(taskID.jobID);
+  private long speculationValue(TaskId taskID, long now) {
+    Job job = context.getJob(taskID.getJobId());
     Task task = job.getTask(taskID);
-    Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
     long acceptableRuntime = Long.MIN_VALUE;
     long result = Long.MIN_VALUE;
 
@@ -349,7 +350,7 @@ public class DefaultSpeculator extends A
       }
     }
 
-    TaskAttemptID runningTaskAttemptID = null;
+    TaskAttemptId runningTaskAttemptID = null;
 
     int numberRunningAttempts = 0;
 
@@ -407,7 +408,7 @@ public class DefaultSpeculator extends A
   }
 
   //Add attempt to a given Task.
-  protected void addSpeculativeAttempt(TaskID taskID) {
+  protected void addSpeculativeAttempt(TaskId taskID) {
     System.out.println
         ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
     eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
@@ -433,10 +434,10 @@ public class DefaultSpeculator extends A
 
     long now = clock.getTime();
 
-    ConcurrentMap<JobID, AtomicInteger> containerNeeds
+    ConcurrentMap<JobId, AtomicInteger> containerNeeds
         = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
 
-    for (ConcurrentMap.Entry<JobID, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
+    for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
       // This race conditon is okay.  If we skip a speculation attempt we
       //  should have tried because the event that lowers the number of
       //  containers needed to zero hasn't come through, it will next time.
@@ -444,7 +445,7 @@ public class DefaultSpeculator extends A
       //  zero but increased due to a failure it's not too bad to launch one
       //  container prematurely.
       if (jobEntry.getValue().get() > 0) {
-        break;
+        continue;
       }
 
       int numberSpeculationsAlready = 0;
@@ -453,18 +454,18 @@ public class DefaultSpeculator extends A
       // loop through the tasks of the kind
       Job job = context.getJob(jobEntry.getKey());
 
-      Map<TaskID, Task> tasks = job.getTasks(type);
+      Map<TaskId, Task> tasks = job.getTasks(type);
 
       int numberAllowedSpeculativeTasks
           = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
                            PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
 
-      TaskID bestTaskID = null;
+      TaskId bestTaskID = null;
       long bestSpeculationValue = -1L;
 
       // this loop is potentially pricey.
       // TODO track the tasks that are potentially worth looking at
-      for (Map.Entry<TaskID, Task> taskEntry : tasks.entrySet()) {
+      for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
         long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
 
         if (mySpeculationValue == ALREADY_SPECULATING) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Thu Mar 31 22:23:22 2011
@@ -20,15 +20,13 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
 
 /*
  * This estimator exponentially smooths the rate of progress vrs. wallclock
@@ -37,8 +35,8 @@ import org.apache.hadoop.mapreduce.v2.ap
  */
 public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
 
-  private final ConcurrentMap<TaskAttemptID, AtomicReference<EstimateVector>> estimates
-      = new ConcurrentHashMap<TaskAttemptID, AtomicReference<EstimateVector>>();
+  private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
+      = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();
 
   private SmoothedValue smoothedValue;
 
@@ -94,7 +92,7 @@ public class ExponentiallySmoothedTaskRu
   }
 
   private void incorporateReading
-      (TaskAttemptID attemptID, float newProgress, long newTime) {
+      (TaskAttemptId attemptID, float newProgress, long newTime) {
     AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
 
     if (vectorRef == null) {
@@ -119,7 +117,7 @@ public class ExponentiallySmoothedTaskRu
     }
   }
 
-  private EstimateVector getEstimateVector(TaskAttemptID attemptID) {
+  private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
     AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
 
     if (vectorRef == null) {
@@ -145,7 +143,7 @@ public class ExponentiallySmoothedTaskRu
   }
 
   @Override
-  public long estimatedRuntime(TaskAttemptID id) {
+  public long estimatedRuntime(TaskAttemptId id) {
     Long startTime = startTimes.get(id);
 
     if (startTime == null) {
@@ -179,13 +177,13 @@ public class ExponentiallySmoothedTaskRu
   }
 
   @Override
-  public long runtimeEstimateVariance(TaskAttemptID id) {
+  public long runtimeEstimateVariance(TaskAttemptId id) {
     return -1L;
   }
 
   @Override
   public void updateAttempt(TaskAttemptStatus status, long timestamp) {
-    TaskAttemptID attemptID = status.id;
+    TaskAttemptId attemptID = status.id;
 
     float progress = status.progress;
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/LegacyTaskRuntimeEstimator.java Thu Mar 31 22:23:22 2011
@@ -22,15 +22,15 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
 
 
 
@@ -48,9 +48,9 @@ public class LegacyTaskRuntimeEstimator 
 
     String stateString = status.stateString.toString();
 
-    TaskAttemptID attemptID = status.id;
-    TaskID taskID = attemptID.taskID;
-    JobID jobID = taskID.jobID;
+    TaskAttemptId attemptID = status.id;
+    TaskId taskID = attemptID.getTaskId();
+    JobId jobID = taskID.getJobId();
     Job job = context.getJob(jobID);
 
     if (job == null) {
@@ -121,9 +121,9 @@ public class LegacyTaskRuntimeEstimator 
   }
 
   private long storedPerAttemptValue
-       (Map<TaskAttempt, AtomicLong> data, TaskAttemptID attemptID) {
-    TaskID taskID = attemptID.taskID;
-    JobID jobID = taskID.jobID;
+       (Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
+    TaskId taskID = attemptID.getTaskId();
+    JobId jobID = taskID.getJobId();
     Job job = context.getJob(jobID);
 
     Task task = job.getTask(taskID);
@@ -145,12 +145,12 @@ public class LegacyTaskRuntimeEstimator 
   }
 
   @Override
-  public long estimatedRuntime(TaskAttemptID attemptID) {
+  public long estimatedRuntime(TaskAttemptId attemptID) {
     return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
   }
 
   @Override
-  public long runtimeEstimateVariance(TaskAttemptID attemptID) {
+  public long runtimeEstimateVariance(TaskAttemptId attemptID) {
     return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/NullTaskRuntimesEngine.java Thu Mar 31 22:23:22 2011
@@ -19,10 +19,10 @@
 package org.apache.hadoop.mapreduce.v2.app.speculate;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
 
 
 /*
@@ -36,7 +36,7 @@ public class NullTaskRuntimesEngine impl
   }
 
   @Override
-  public long attemptEnrolledTime(TaskAttemptID attemptID) {
+  public long attemptEnrolledTime(TaskAttemptId attemptID) {
     return Long.MAX_VALUE;
   }
 
@@ -51,21 +51,21 @@ public class NullTaskRuntimesEngine impl
   }
 
   @Override
-  public long thresholdRuntime(TaskID id) {
+  public long thresholdRuntime(TaskId id) {
     return Long.MAX_VALUE;
   }
 
   @Override
-  public long estimatedRuntime(TaskAttemptID id) {
+  public long estimatedRuntime(TaskAttemptId id) {
     return -1L;
   }
   @Override
-  public long estimatedNewAttemptRuntime(TaskID id) {
+  public long estimatedNewAttemptRuntime(TaskId id) {
     return -1L;
   }
 
   @Override
-  public long runtimeEstimateVariance(TaskAttemptID id) {
+  public long runtimeEstimateVariance(TaskAttemptId id) {
     return -1L;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/SpeculatorEvent.java Thu Mar 31 22:23:22 2011
@@ -20,8 +20,8 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 
 public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
 
@@ -29,7 +29,7 @@ public class SpeculatorEvent extends Abs
   private TaskAttemptStatus reportedStatus;
 
   // valid for TASK_CONTAINER_NEED_UPDATE
-  private TaskID taskID;
+  private TaskId taskID;
   private int containersNeededChange;
 
 
@@ -39,7 +39,7 @@ public class SpeculatorEvent extends Abs
     this.reportedStatus = reportedStatus;
   }
 
-  public SpeculatorEvent(TaskAttemptID attemptID, boolean flag, long timestamp) {
+  public SpeculatorEvent(TaskAttemptId attemptID, boolean flag, long timestamp) {
     super(Speculator.EventType.ATTEMPT_START, timestamp);
     this.reportedStatus = new TaskAttemptStatus();
     this.reportedStatus.id = attemptID;
@@ -54,7 +54,7 @@ public class SpeculatorEvent extends Abs
    *  speculation wouldn't compete for containers with tasks which need
    *  to be run.
    */
-  public SpeculatorEvent(TaskID taskID, int containersNeededChange) {
+  public SpeculatorEvent(TaskId taskID, int containersNeededChange) {
     super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
     this.taskID = taskID;
     this.containersNeededChange = containersNeededChange;
@@ -68,7 +68,7 @@ public class SpeculatorEvent extends Abs
     return containersNeededChange;
   }
 
-  public TaskID getTaskID() {
+  public TaskId getTaskID() {
     return taskID;
   }
 }
\ No newline at end of file



Mime
View raw message