Author: schen
Date: Mon Feb 28 22:40:02 2011
New Revision: 1075569
URL: http://svn.apache.org/viewvc?rev=1075569&view=rev
Log:
MAPREDUCE-2206. The task-cleanup tasks should be optional. (schen)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Feb 28 22:40:02 2011
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
MAPREDUCE-1927. Unit test for HADOOP-6835 (concatenated gzip support).
(Greg Roelofs via tomwhite)
+ MAPREDUCE-2206. The task-cleanup tasks should be optional. (schen)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Feb 28
22:40:02 2011
@@ -146,6 +146,7 @@ public class JobInProgress {
private volatile boolean jobKilled = false;
private volatile boolean jobFailed = false;
private final boolean jobSetupCleanupNeeded;
+ private final boolean taskCleanupNeeded;
JobPriority priority = JobPriority.NORMAL;
protected JobTracker jobtracker;
@@ -360,6 +361,8 @@ public class JobInProgress {
MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
this.jobSetupCleanupNeeded = conf.getBoolean(
MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+ this.taskCleanupNeeded = conf.getBoolean(
+ MRJobConfig.TASK_CLEANUP_NEEDED, true);
if (tracker != null) { // Some mock tests have null tracker
this.jobHistory = tracker.getJobHistory();
}
@@ -369,6 +372,7 @@ public class JobInProgress {
JobInProgress(JobConf conf) {
restartCount = 0;
jobSetupCleanupNeeded = false;
+ taskCleanupNeeded = true;
this.memoryPerMap = conf.getMemoryForMapTask();
this.memoryPerReduce = conf.getMemoryForReduceTask();
@@ -449,6 +453,7 @@ public class JobInProgress {
numMapTasks + numReduceTasks + 10);
JobContext jobContext = new JobContextImpl(conf, jobId);
this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
+ this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
// Construct the jobACLs
status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
@@ -1078,12 +1083,12 @@ public class JobInProgress {
status.setRunState(TaskStatus.State.KILLED);
}
- // If the job is complete and a task has just reported its
- // state as FAILED_UNCLEAN/KILLED_UNCLEAN,
+ // If the job is complete or task-cleanup is switched off
+ // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN,
// make the task's state FAILED/KILLED without launching cleanup attempt.
// Note that if task is already a cleanup attempt,
// we don't change the state to make sure the task gets a killTaskAction
- if ((this.isComplete() || jobFailed || jobKilled) &&
+ if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) &&
!tip.isCleanupAttempt(taskid)) {
if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
status.setRunState(TaskStatus.State.FAILED);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Mon Feb 28
22:40:02 2011
@@ -182,6 +182,13 @@ public interface JobContext extends MRJo
* @return boolean
*/
public boolean getJobSetupCleanupNeeded();
+
+ /**
+ * Get whether task-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getTaskCleanupNeeded();
/**
* Get whether the task profiling is enabled.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Feb 28
22:40:02 2011
@@ -40,6 +40,8 @@ public interface MRJobConfig {
public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";
+ public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+
public static final String JAR = "mapreduce.job.jar";
public static final String ID = "mapreduce.job.id";
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
Mon Feb 28 22:40:02 2011
@@ -198,6 +198,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN
}
@Override
+ public boolean getTaskCleanupNeeded() {
+ return base.getTaskCleanupNeeded();
+ }
+
+ @Override
public Path[] getLocalCacheArchives() throws IOException {
return base.getLocalCacheArchives();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
Mon Feb 28 22:40:02 2011
@@ -191,6 +191,11 @@ class ChainReduceContextImpl<KEYIN, VALU
}
@Override
+ public boolean getTaskCleanupNeeded() {
+ return base.getTaskCleanupNeeded();
+ }
+
+ @Override
public Path[] getLocalCacheArchives() throws IOException {
return base.getLocalCacheArchives();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
Mon Feb 28 22:40:02 2011
@@ -200,6 +200,11 @@ public class WrappedMapper<KEYIN, VALUEI
}
@Override
+ public boolean getTaskCleanupNeeded() {
+ return mapContext.getTaskCleanupNeeded();
+ }
+
+ @Override
public Path[] getLocalCacheArchives() throws IOException {
return mapContext.getLocalCacheArchives();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
Mon Feb 28 22:40:02 2011
@@ -193,6 +193,11 @@ public class WrappedReducer<KEYIN, VALUE
}
@Override
+ public boolean getTaskCleanupNeeded() {
+ return reduceContext.getTaskCleanupNeeded();
+ }
+
+ @Override
public Path[] getLocalCacheArchives() throws IOException {
return reduceContext.getLocalCacheArchives();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Mon
Feb 28 22:40:02 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
@@ -263,7 +264,16 @@ public class JobContextImpl implements J
* @return boolean
*/
public boolean getJobSetupCleanupNeeded() {
- return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+ return conf.getBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, true);
+ }
+
+ /**
+ * Get whether task-cleanup is needed for the job
+ *
+ * @return boolean
+ */
+ public boolean getTaskCleanupNeeded() {
+ return conf.getBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, true);
}
/**
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=1075569&r1=1075568&r2=1075569&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Mon
Feb 28 22:40:02 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskType;
@@ -125,7 +126,8 @@ public class TestTaskFail extends TestCa
}
private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
- TaskStatus ts, boolean isCleanup, JobTracker jt)
+ TaskStatus ts, boolean isCleanup,
+ boolean containsCleanupLog, JobTracker jt)
throws IOException {
assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
assertTrue(ts != null);
@@ -142,11 +144,12 @@ public class TestTaskFail extends TestCa
"&filter=STDERR";
assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
.getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
- if (!isCleanup) {
+ if (containsCleanupLog) {
// validate task logs: tasklog should contain both task logs
// and cleanup logs
assertTrue(log.contains(cleanupLog));
- } else {
+ }
+ if (isCleanup) {
// validate tasklogs for cleanup attempt
log = MapReduceTestUtil.readTaskLog(
TaskLog.LogName.STDERR, attemptId, true);
@@ -169,7 +172,7 @@ public class TestTaskFail extends TestCa
}
}
- private void validateJob(RunningJob job, JobTracker jt)
+ private void validateJob(RunningJob job, JobTracker jt, boolean cleanupNeeded)
throws IOException {
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
@@ -181,19 +184,21 @@ public class TestTaskFail extends TestCa
new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
TaskInProgress tip = jt.getTip(attemptId.getTaskID());
TaskStatus ts = jt.getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, false, jt);
+ // task logs will contain cleanup message because the task is failed by
+ // throwing IOException
+ validateAttempt(tip, attemptId, ts, false, true, jt);
attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
// this should be cleanup attempt since the second attempt fails
// with System.exit
ts = jt.getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, true, jt);
+ validateAttempt(tip, attemptId, ts, cleanupNeeded, false, jt);
attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
// this should be cleanup attempt since the third attempt fails
// with Error
ts = jt.getTaskStatus(attemptId);
- validateAttempt(tip, attemptId, ts, true, jt);
+ validateAttempt(tip, attemptId, ts, cleanupNeeded, false, jt);
}
public void testWithDFS() throws IOException {
@@ -219,18 +224,25 @@ public class TestTaskFail extends TestCa
jobConf.setOutputCommitter(CommitterWithLogs.class);
RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, jt);
+ validateJob(rJob, jt, true);
// launch job with fail tasks and fail-cleanups
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, jt);
+ validateJob(rJob, jt, true);
fileSys.delete(outDir, true);
jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
- validateJob(rJob, jt);
+ validateJob(rJob, jt, true);
+ // launch job with task-cleanup switched off
+ fileSys.delete(outDir, true);
+ jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
+ jobConf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false);
+ rJob = launchJob(jobConf, inDir, outDir, input);
+ rJob.waitForCompletion();
+ validateJob(rJob, jt, false);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown(); }
|