Author: sseth
Date: Wed Feb 13 19:23:50 2013
New Revision: 1445873
URL: http://svn.apache.org/r1445873
Log:
merge MAPREDUCE-5000 from trunk. Fixes getCounters when speculating by fixing the selection
of the best attempt for a task. Contributed by Jason Lowe.
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1445873&r1=1445872&r2=1445873&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 19:23:50
2013
@@ -17,6 +17,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
+ MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
+ of the best attempt for a task. (Jason Lowe via sseth)
+
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1445873&r1=1445872&r2=1445873&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Wed Feb 13 19:23:50 2013
@@ -539,6 +539,10 @@ public abstract class TaskImpl implement
//select the nextAttemptNumber with best progress
// always called inside the Read Lock
private TaskAttempt selectBestAttempt() {
+ if (successfulAttempt != null) {
+ return attempts.get(successfulAttempt);
+ }
+
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1445873&r1=1445872&r2=1445873&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Wed Feb 13 19:23:50 2013
@@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -143,6 +145,7 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskType taskType;
+ private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -178,7 +181,15 @@ public class TestTaskImpl {
public TaskAttemptState getState() {
return state;
}
-
+
+ @Override
+ public Counters getCounters() {
+ return attemptCounters;
+ }
+
+ public void setCounters(Counters counters) {
+ attemptCounters = counters;
+ }
}
private class MockTask extends Task {
@@ -687,4 +698,49 @@ public class TestTaskImpl {
TaskEventType.T_ATTEMPT_KILLED));
assertEquals(TaskState.FAILED, mockTask.getState());
}
+
+ @Test
+ public void testCountersWithSpeculation() {
+ mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ remoteJobConfFile, conf, taskAttemptListener, jobToken,
+ credentials, clock,
+ completedTasksFromPreviousRun, startCount,
+ metrics, appContext, TaskType.MAP) {
+ @Override
+ protected int getMaxAttempts() {
+ return 1;
+ }
+ };
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.RUNNING);
+ MockTaskAttemptImpl baseAttempt = getLastAttempt();
+
+ // add a speculative attempt
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.RUNNING);
+ MockTaskAttemptImpl specAttempt = getLastAttempt();
+ assertEquals(2, taskAttempts.size());
+
+ Counters specAttemptCounters = new Counters();
+ Counter cpuCounter = specAttemptCounters.findCounter(
+ TaskCounter.CPU_MILLISECONDS);
+ cpuCounter.setValue(1000);
+ specAttempt.setCounters(specAttemptCounters);
+
+ // have the spec attempt succeed but second attempt at 1.0 progress as well
+ commitTaskAttempt(specAttempt.getAttemptId());
+ specAttempt.setProgress(1.0f);
+ specAttempt.setState(TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+ baseAttempt.setProgress(1.0f);
+
+ Counters taskCounters = mockTask.getCounters();
+ assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
+ }
}
|