hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1445871 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Date Wed, 13 Feb 2013 19:20:32 GMT
Author: sseth
Date: Wed Feb 13 19:20:31 2013
New Revision: 1445871

URL: http://svn.apache.org/r1445871
Log:
MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection of the best attempt
for a task. Contributed by Jason Lowe.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1445871&r1=1445870&r2=1445871&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 19:20:31 2013
@@ -169,6 +169,9 @@ Release 2.0.4-beta - UNRELEASED
 
     MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
 
+    MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
+    of the best attempt for a task. (Jason Lowe via sseth)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1445871&r1=1445870&r2=1445871&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Wed Feb 13 19:20:31 2013
@@ -539,6 +539,10 @@ public abstract class TaskImpl implement
   //select the nextAttemptNumber with best progress
   // always called inside the Read Lock
   private TaskAttempt selectBestAttempt() {
+    if (successfulAttempt != null) {
+      return attempts.get(successfulAttempt);
+    }
+
     float progress = 0f;
     TaskAttempt result = null;
     for (TaskAttempt at : attempts.values()) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1445871&r1=1445870&r2=1445871&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Wed Feb 13 19:20:31 2013
@@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -143,6 +145,7 @@ public class TestTaskImpl {
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
     private TaskType taskType;
+    private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -178,7 +181,15 @@ public class TestTaskImpl {
     public TaskAttemptState getState() {
       return state;
     }
-    
+
+    @Override
+    public Counters getCounters() {
+      return attemptCounters;
+    }
+
+    public void setCounters(Counters counters) {
+      attemptCounters = counters;
+    }
   }
   
   private class MockTask extends Task {
@@ -687,4 +698,49 @@ public class TestTaskImpl {
         TaskEventType.T_ATTEMPT_KILLED));
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
+
+  @Test
+  public void testCountersWithSpeculation() {
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, appContext, TaskType.MAP) {
+          @Override
+          protected int getMaxAttempts() {
+            return 1;
+          }
+    };
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl baseAttempt = getLastAttempt();
+
+    // add a speculative attempt
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl specAttempt = getLastAttempt();
+    assertEquals(2, taskAttempts.size());
+
+    Counters specAttemptCounters = new Counters();
+    Counter cpuCounter = specAttemptCounters.findCounter(
+        TaskCounter.CPU_MILLISECONDS);
+    cpuCounter.setValue(1000);
+    specAttempt.setCounters(specAttemptCounters);
+
+    // have the spec attempt succeed but second attempt at 1.0 progress as well
+    commitTaskAttempt(specAttempt.getAttemptId());
+    specAttempt.setProgress(1.0f);
+    specAttempt.setState(TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+    baseAttempt.setProgress(1.0f);
+
+    Counters taskCounters = mockTask.getCounters();
+    assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
+  }
 }



Mime
View raw message