Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar 8 05:54:13 2011
@@ -72,10 +72,8 @@ public class ReduceTask extends Task {
private CompressionCodec codec;
-
{
getProgress().setStatus("reduce");
- setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
private Progress copyPhase;
@@ -121,14 +119,21 @@ public class ReduceTask extends Task {
public ReduceTask() {
super();
+ this.taskStatus = new ReduceTaskStatus();
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
int partition, int numMaps, int numSlotsRequired) {
super(jobFile, taskId, partition, numSlotsRequired);
this.numMaps = numMaps;
+/*
+ */
+ this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+ TaskStatus.State.UNASSIGNED,
+ "", "", "", TaskStatus.Phase.SHUFFLE,
+ getCounters());
}
-
+
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
@@ -151,6 +156,45 @@ public class ReduceTask extends Task {
return false;
}
+ /**
+ * Is this really a combo-task masquerading as a plain MapTask? Decidedly
+ * not.
+ */
+ @Override
+ public boolean isUberTask() {
+ return false;
+ }
+
+ /**
+ * Allow UberTask (or, potentially, JobInProgress or others) to set up a
+ * deeper Progress hierarchy even if run() is skipped. If setProgress()
+ * is also needed, it should be called <I>before</I> createPhase() or else
+ * the sub-phases created here will be wiped out.
+ */
+ void createPhase(TaskStatus.Phase phaseType, String status) {
+ if (phaseType == TaskStatus.Phase.SHUFFLE) {
+ copyPhase = getProgress().addPhase(status);
+ } else if (phaseType == TaskStatus.Phase.SORT) {
+ sortPhase = getProgress().addPhase(status);
+ } else /* TaskStatus.Phase.REDUCE */ {
+ reducePhase = getProgress().addPhase(status);
+ }
+ }
+
+ /**
+ * Allow UberTask to traverse the deeper Progress hierarchy in case run() is
+ * skipped.
+ */
+ void completePhase(TaskStatus.Phase phaseType) {
+ if (phaseType == TaskStatus.Phase.SHUFFLE) {
+ copyPhase.complete();
+ } else if (phaseType == TaskStatus.Phase.SORT) {
+ sortPhase.complete();
+ } else /* TaskStatus.Phase.REDUCE */ {
+ reducePhase.complete();
+ }
+ }
+
public int getNumMaps() { return numMaps; }
/**
@@ -177,37 +221,39 @@ public class ReduceTask extends Task {
}
// Get the input files for the reducer.
- private Path[] getMapFiles(FileSystem fs, boolean isLocal)
+ static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal)
throws IOException {
List<Path> fileList = new ArrayList<Path>();
if (isLocal) {
// for local jobs
- for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i));
+ for (int i = 0; i < reduce.numMaps; ++i) {
+ fileList.add(reduce.mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
- for (FileStatus filestatus : mapOutputFilesOnDisk) {
+ for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) {
fileList.add(filestatus.getPath());
}
}
return fileList.toArray(new Path[0]);
}
- private class ReduceValuesIterator<KEY,VALUE>
+ private static class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
- public ReduceValuesIterator (RawKeyValueIterator in,
+ ReduceTask reduce;
+ public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
+ this.reduce = reduce;
}
@Override
public VALUE next() {
- reduceInputValueCounter.increment(1);
+ reduce.reduceInputValueCounter.increment(1);
return moveToNext();
}
@@ -216,12 +262,13 @@ public class ReduceTask extends Task {
}
public void informReduceProgress() {
- reducePhase.set(super.in.getProgress().getProgress()); // update progress
+ // update progress:
+ reduce.reducePhase.set(super.in.getProgress().getProgress());
reporter.progress();
}
}
- private class SkippingReduceValuesIterator<KEY,VALUE>
+ private static class SkippingReduceValuesIterator<KEY,VALUE>
extends ReduceValuesIterator<KEY,VALUE> {
private SkipRangeIterator skipIt;
private TaskUmbilicalProtocol umbilical;
@@ -234,26 +281,27 @@ public class ReduceTask extends Task {
private boolean toWriteSkipRecs;
private boolean hasNext;
private TaskReporter reporter;
-
- public SkippingReduceValuesIterator(RawKeyValueIterator in,
+
+ public SkippingReduceValuesIterator(ReduceTask reduce,
+ RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
- super(in, comparator, keyClass, valClass, conf, reporter);
+ super(reduce, in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
- this.skipGroupCounter =
+ this.skipGroupCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
- this.skipRecCounter =
+ this.skipRecCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
- this.toWriteSkipRecs = toWriteSkipRecs() &&
+ this.toWriteSkipRecs = reduce.toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
- skipIt = getSkipRanges().skipRangeIterator();
+ skipIt = reduce.getSkipRanges().skipRangeIterator();
mayBeSkip();
}
-
+
public void nextKey() throws IOException {
super.nextKey();
mayBeSkip();
@@ -292,16 +340,16 @@ public class ReduceTask extends Task {
}
skipGroupCounter.increment(skip);
skipRecCounter.increment(skipRec);
- reportNextRecordRange(umbilical, grpIndex);
+ reduce.reportNextRecordRange(umbilical, grpIndex);
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(KEY key, VALUE value) throws IOException{
if(skipWriter==null) {
- Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
- Path skipFile = new Path(skipDir, getTaskID().toString());
+ Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf);
+ Path skipFile = new Path(skipDir, reduce.getTaskID().toString());
skipWriter = SequenceFile.createWriter(
- skipFile.getFileSystem(conf), conf, skipFile,
+ skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile,
keyClass, valClass,
CompressionType.BLOCK, reporter);
}
@@ -363,8 +411,8 @@ public class ReduceTask extends Task {
} else {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec,
- getMapFiles(rfs, true),
+ job.getMapOutputValueClass(), codec,
+ getMapFiles(this, rfs, true),
!conf.getKeepFailedTaskFiles(),
job.getInt(JobContext.IO_SORT_FACTOR, 100),
new Path(getTaskID().toString()),
@@ -382,18 +430,40 @@ public class ReduceTask extends Task {
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
- runNewReducer(job, umbilical, reporter, rIter, comparator,
+ runNewReducer(this, job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
- runOldReducer(job, umbilical, reporter, rIter, comparator,
+ runOldReducer(this, job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
+ private static class WrappedOutputCollector<OUTKEY, OUTVALUE>
+ implements OutputCollector<OUTKEY, OUTVALUE> {
+ RecordWriter<OUTKEY, OUTVALUE> out;
+ TaskReporter reporter;
+ Counters.Counter reduceOutputCounter;
+ public WrappedOutputCollector(ReduceTask reduce,
+ RecordWriter<OUTKEY, OUTVALUE> out,
+ TaskReporter reporter) {
+ this.out = out;
+ this.reporter = reporter;
+ this.reduceOutputCounter = reduce.reduceOutputCounter;
+ }
+
+ public void collect(OUTKEY key, OUTVALUE value)
+ throws IOException {
+ out.write(key, value);
+ reduceOutputCounter.increment(1);
+ // indicate that progress update needs to be sent
+ reporter.progress();
+ }
+ }
+
@SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runOldReducer(JobConf job,
+ static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runOldReducer(ReduceTask reduce, JobConf job,
TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -403,7 +473,7 @@ public class ReduceTask extends Task {
Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
// make output collector
- String finalName = getOutputName(getPartition());
+ String finalName = getOutputName(reduce.getPartition());
FileSystem fs = FileSystem.get(job);
@@ -411,15 +481,7 @@ public class ReduceTask extends Task {
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector<OUTKEY,OUTVALUE> collector =
- new OutputCollector<OUTKEY,OUTVALUE>() {
- public void collect(OUTKEY key, OUTVALUE value)
- throws IOException {
- out.write(key, value);
- reduceOutputCounter.increment(1);
- // indicate that progress update needs to be sent
- reporter.progress();
- }
- };
+ new WrappedOutputCollector<OUTKEY, OUTVALUE>(reduce, out, reporter);
// apply reduce function
try {
@@ -427,16 +489,16 @@ public class ReduceTask extends Task {
boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
SkipBadRecords.getAutoIncrReducerProcCount(job);
- ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ?
- new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter,
+ ReduceValuesIterator<INKEY,INVALUE> values = reduce.isSkipping() ?
+ new SkippingReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
- new ReduceValuesIterator<INKEY,INVALUE>(rIter,
+ new ReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
- reduceInputKeyCounter.increment(1);
+ reduce.reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
@@ -449,7 +511,7 @@ public class ReduceTask extends Task {
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
- //End of clean up.
+ //End of cleanup.
} catch (IOException ioe) {
try {
reducer.close();
@@ -487,9 +549,38 @@ public class ReduceTask extends Task {
}
}
+ private static class WrappedRawKeyValueIterator implements RawKeyValueIterator {
+ ReduceTask reduce;
+ TaskReporter reporter;
+ RawKeyValueIterator rawIter;
+ public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter,
+ RawKeyValueIterator rawIter) {
+ this.reduce = reduce;
+ this.rawIter = rawIter;
+ this.reporter = reporter;
+ }
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ }
+
@SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runNewReducer(JobConf job,
+ static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runNewReducer(final ReduceTask reduce, JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -498,49 +589,29 @@ public class ReduceTask extends Task {
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
+ org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID();
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
- rIter = new RawKeyValueIterator() {
- public void close() throws IOException {
- rawIter.close();
- }
- public DataInputBuffer getKey() throws IOException {
- return rawIter.getKey();
- }
- public Progress getProgress() {
- return rawIter.getProgress();
- }
- public DataInputBuffer getValue() throws IOException {
- return rawIter.getValue();
- }
- public boolean next() throws IOException {
- boolean ret = rawIter.next();
- reporter.setProgress(rawIter.getProgress().getProgress());
- return ret;
- }
- };
+ rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter);
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
+ new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
- outputFormat.getRecordWriter(taskContext);
+ reduce.outputFormat.getRecordWriter(taskContext);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
- new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
- job.setBoolean("mapred.skip.on", isSkipping());
- job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
- org.apache.hadoop.mapreduce.Reducer.Context
- reducerContext = createReduceContext(reducer, job, getTaskID(),
- rIter, reduceInputKeyCounter,
- reduceInputValueCounter,
- trackedRW,
- committer,
- reporter, comparator, keyClass,
- valueClass);
+ new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduce.reduceOutputCounter);
+ job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping());
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(reducer, job, reduceId, rIter,
+ reduce.reduceInputKeyCounter,
+ reduce.reduceInputValueCounter,
+ trackedRW, reduce.committer, reporter,
+ comparator, keyClass, valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar 8 05:54:13 2011
@@ -54,6 +54,11 @@ class ReduceTaskStatus extends TaskStatu
}
@Override
+ public boolean getIsUber() {
+ return false;
+ }
+
+ @Override
void setFinishTime(long finishTime) {
if (shuffleFinishTime == 0) {
this.shuffleFinishTime = finishTime;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:54:13 2011
@@ -118,6 +118,7 @@ abstract public class Task implements Wr
private String jobFile; // job configuration file
private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id
+ private TaskAttemptID taskIdForUmbilical; // same, or uber's if subtask
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
protected JobStatus.State jobRunStateForCleanup;
@@ -159,8 +160,8 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public Task() {
- taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
+ taskIdForUmbilical = taskId;
spilledRecordsCounter =
counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter =
@@ -174,17 +175,10 @@ abstract public class Task implements Wr
int numSlotsRequired) {
this.jobFile = jobFile;
this.taskId = taskId;
+ this.taskIdForUmbilical = taskId;
this.partition = partition;
this.numSlotsRequired = numSlotsRequired;
- this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
- 0.0f, numSlotsRequired,
- TaskStatus.State.UNASSIGNED,
- "", "", "",
- isMapTask() ?
- TaskStatus.Phase.MAP :
- TaskStatus.Phase.SHUFFLE,
- counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
@@ -197,7 +191,12 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
+
public TaskAttemptID getTaskID() { return taskId; }
+ public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) {
+ this.taskIdForUmbilical = taskIdForUmbilical;
+ }
+
public int getNumSlotsRequired() {
return numSlotsRequired;
}
@@ -269,7 +268,7 @@ abstract public class Task implements Wr
/**
* Report a fatal error to the parent (task) tracker.
*/
- protected void reportFatalError(TaskAttemptID id, Throwable throwable,
+ protected void reportFatalError(Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
Throwable tCause = throwable.getCause();
@@ -277,7 +276,7 @@ abstract public class Task implements Wr
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
- umbilical.fatalError(id, cause);
+ umbilical.fatalError(taskIdForUmbilical, cause);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
@@ -389,6 +388,14 @@ abstract public class Task implements Wr
this.user = user;
}
+ /**
+ * Return the task's MapOutputFile instance.
+ * @return the task's MapOutputFile instance
+ */
+ MapOutputFile getMapOutputFile() {
+ return mapOutputFile;
+ }
+
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -396,6 +403,7 @@ abstract public class Task implements Wr
public void write(DataOutput out) throws IOException {
Text.writeString(out, jobFile);
taskId.write(out);
+ taskIdForUmbilical.write(out);
out.writeInt(partition);
out.writeInt(numSlotsRequired);
taskStatus.write(out);
@@ -414,6 +422,7 @@ abstract public class Task implements Wr
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
+ taskIdForUmbilical = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
@@ -465,7 +474,7 @@ abstract public class Task implements Wr
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
- private transient Progress taskProgress = new Progress();
+ private transient Progress taskProgress = new Progress(); //GRR Q: why transient? lose entire tree every time serialize??
// Current counters
private transient Counters counters = new Counters();
@@ -475,6 +484,21 @@ abstract public class Task implements Wr
public abstract boolean isMapTask();
+ /**
+ * Is this really a combo-task masquerading as a plain MapTask?
+ */
+ public abstract boolean isUberTask();
+
+ /**
+ * This setter allows one to incorporate Tasks into multiple levels of
+ * a Progress addPhase()-generated hierarchy (i.e., not always the root
+ * node), which in turn allows Progress to handle all details of progress
+ * aggregation for an UberTask or even a whole job.
+ */
+ protected void setProgress(Progress progress) {
+ taskProgress = progress;
+ }
+
public Progress getProgress() { return taskProgress; }
public void initialize(JobConf job, JobID id,
@@ -518,7 +542,7 @@ abstract public class Task implements Wr
resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
}
}
-
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
protected class TaskReporter
@@ -561,6 +585,7 @@ abstract public class Task implements Wr
// indicate that progress update needs to be sent
setProgressFlag();
}
+ // FIXME? why isn't this deprecated in favor of public setProgressFlag()?
public void progress() {
// indicate that progress update needs to be sent
setProgressFlag();
@@ -603,15 +628,16 @@ abstract public class Task implements Wr
}
public InputSplit getInputSplit() throws UnsupportedOperationException {
if (split == null) {
- throw new UnsupportedOperationException("Input only available on map");
+ throw new UnsupportedOperationException("Input available only on map");
} else {
return split;
}
}
/**
- * The communication thread handles communication with the parent (Task Tracker).
- * It sends progress updates if progress has been made or if the task needs to
- * let the parent know that it's alive. It also pings the parent to see if it's alive.
+ * The communication thread handles communication with the parent
+ * (TaskTracker). It sends progress updates if progress has been made or
+ * if the task needs to let the parent know that it's alive. It also pings
+ * the parent to see if it's alive.
*/
public void run() {
final int MAX_RETRIES = 3;
@@ -647,8 +673,8 @@ abstract public class Task implements Wr
taskFound = umbilical.ping(taskId);
}
- // if Task Tracker is not aware of our task ID (probably because it died and
- // came back up), kill ourselves
+ // if TaskTracker is not aware of our task ID (probably because it
+ // died and came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
@@ -682,7 +708,7 @@ abstract public class Task implements Wr
}
}
}
-
+
/**
* Reports the next executing record range to TaskTracker.
*
@@ -701,7 +727,7 @@ abstract public class Task implements Wr
if (LOG.isDebugEnabled()) {
LOG.debug("sending reportNextRecordRange " + range);
}
- umbilical.reportNextRecordRange(taskId, range);
+ umbilical.reportNextRecordRange(taskIdForUmbilical, range);
}
/**
@@ -837,8 +863,13 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- LOG.info("Task:" + taskId + " is done."
- + " And is in the process of commiting");
+ if (isUberTask()) {
+ LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId
+ + "is done and is in the process of committing.");
+ } else {
+ LOG.info("Task:" + taskId
+ + "is done and is in the process of committing.");
+ }
updateCounters();
boolean commitRequired = isCommitRequired();
@@ -848,7 +879,7 @@ abstract public class Task implements Wr
// say the task tracker that task is commit pending
while (true) {
try {
- umbilical.commitPending(taskId, taskStatus);
+ umbilical.commitPending(taskIdForUmbilical, taskStatus);
break;
} catch (InterruptedException ie) {
// ignore
@@ -899,8 +930,14 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
- LOG.warn("Parent died. Exiting "+taskId);
+ //GRR FIXME (later): alternatives to taskIdForUmbilical would be
+ // (1) include taskId as part of umbilical object and protocol;
+ // (2) include taskId as part of taskStatus
+ // (3) extend TaskAttemptID (or create related Task inner class?) to
+ // include taskAttemptId() and taskAtteptIdForUmbilical() method
+ // that's overridden in uber context [Dick]
+ if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) {
+ LOG.warn("Parent died. Exiting " + taskId);
System.exit(66);
}
taskStatus.clearStatus();
@@ -936,7 +973,7 @@ abstract public class Task implements Wr
* @return -1 if it can't be found.
*/
private long calculateOutputSize() throws IOException {
- if (!isMapOrReduce()) {
+ if (!isMapOrReduce() || isUberTask()) {
return -1;
}
@@ -956,8 +993,13 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.done(getTaskID());
- LOG.info("Task '" + taskId + "' done.");
+ umbilical.done(taskIdForUmbilical);
+ if (isUberTask()) {
+ LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId
+ + "' done.");
+ } else {
+ LOG.info("Task '" + taskId + "' done.");
+ }
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
@@ -976,7 +1018,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- while (!umbilical.canCommit(taskId)) {
+ while (!umbilical.canCommit(taskIdForUmbilical)) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
@@ -1033,7 +1075,7 @@ abstract public class Task implements Wr
setPhase(TaskStatus.Phase.CLEANUP);
getProgress().setStatus("cleanup");
statusUpdate(umbilical);
- LOG.info("Runnning cleanup for the task");
+ LOG.info("Running cleanup for the task");
// do the cleanup
committer.abortTask(taskContext);
}
@@ -1058,8 +1100,10 @@ abstract public class Task implements Wr
oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
}
} else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
- LOG.info("Committing job");
- committer.commitJob(jobContext);
+ // delete <outputdir>/_temporary and optionally create _SUCCESS file
+ if (!isUberTask()) { // defer since output files have not yet been saved
+ commitJob();
+ }
} else {
throw new IOException("Invalid state of the job for cleanup. State found "
+ jobRunStateForCleanup + " expecting "
@@ -1068,20 +1112,31 @@ abstract public class Task implements Wr
+ JobStatus.State.KILLED);
}
- // delete the staging area for the job
+ // delete the staging area for the job (e.g.,
+ // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/
+ // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir)
JobConf conf = new JobConf(jobContext.getConfiguration());
if (!supportIsolationRunner(conf)) {
- String jobTempDir = conf.get("mapreduce.job.dir");
- Path jobTempDirPath = new Path(jobTempDir);
- FileSystem fs = jobTempDirPath.getFileSystem(conf);
- fs.delete(jobTempDirPath, true);
+ String jobStagingDir = conf.get("mapreduce.job.dir");
+ Path jobStagingDirPath = new Path(jobStagingDir);
+ FileSystem fs = jobStagingDirPath.getFileSystem(conf);
+ fs.delete(jobStagingDirPath, true);
+ }
+ // update counters, save any pending output files, shut down the progress-
+ // reporter communication thread and the umbilical, and mark the task done
+ if (!isUberTask()) { // defer so UberTask can send TT final update(s)
+ done(umbilical, reporter);
}
- done(umbilical, reporter);
}
-
+
+ protected void commitJob() throws IOException {
+ LOG.info("Committing job");
+ committer.commitJob(jobContext);
+ }
+
protected boolean supportIsolationRunner(JobConf conf) {
- return (conf.getKeepTaskFilesPattern() != null || conf
- .getKeepFailedTaskFiles());
+ return (conf.getKeepTaskFilesPattern() != null ||
+ conf.getKeepFailedTaskFiles());
}
protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
@@ -1090,9 +1145,12 @@ abstract public class Task implements Wr
// do the setup
getProgress().setStatus("setup");
committer.setupJob(jobContext);
- done(umbilical, reporter);
+ if (!isUberTask()) {
+ // UberTask calls done() directly; don't shut down umbilical prematurely
+ done(umbilical, reporter);
+ }
}
-
+
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar 8 05:54:13 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.Node;
@@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node;
* **************************************************************
*/
class TaskInProgress {
- static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
+ static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
int maxTaskAttempts = 4;
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
@@ -75,14 +75,19 @@ class TaskInProgress {
// Defines the TIP
private String jobFile = null;
- private TaskSplitMetaInfo splitInfo;
+ private TaskSplitMetaInfo[] splitInfo;
private int numMaps;
+ private int numReduces;
private int partition;
private JobTracker jobtracker;
private JobHistory jobHistory;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
+ private boolean jobCleanup = false;
+ private boolean jobSetup = false;
+ private boolean isUber = false;
+ private boolean jobSetupCleanupNeeded = false; // UberTasks only
// Status of the TIP
private int successEventNumber = -1;
@@ -100,8 +105,6 @@ class TaskInProgress {
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
- private boolean jobCleanup = false;
- private boolean jobSetup = false;
private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
@@ -169,7 +172,8 @@ class TaskInProgress {
JobInProgress job, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
- this.splitInfo = split;
+ this.splitInfo = new TaskSplitMetaInfo[1];
+ this.splitInfo[0] = split;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
@@ -183,7 +187,7 @@ class TaskInProgress {
}
this.user = job.getUser();
}
-
+
/**
* Constructor for ReduceTask
*/
@@ -265,7 +269,36 @@ class TaskInProgress {
}
}
}
-
+
+ /**
+ * Constructor for UberTask
+ */
+ public TaskInProgress(JobID jobid, String jobFile,
+ TaskSplitMetaInfo[] splits,
+ int numMaps, int numReduces,
+ int partition, JobTracker jobtracker, JobConf conf,
+ JobInProgress job, int numSlotsRequired,
+ boolean jobSetupCleanupNeeded) {
+ this.isUber = true;
+ this.jobFile = jobFile;
+ this.splitInfo = splits;
+ this.numMaps = numMaps;
+ this.numReduces = numReduces;
+ this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+ this.jobtracker = jobtracker;
+ this.job = job;
+ this.conf = conf;
+ this.partition = partition;
+ this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+ this.numSlotsRequired = numSlotsRequired;
+ setMaxTaskAttempts();
+ init(jobid);
+ if (jobtracker != null) {
+ this.jobHistory = jobtracker.getJobHistory();
+ }
+ this.user = job.getUser();
+ }
+
/**
* Set the max number of attempts before we declare a TIP as "failed"
*/
@@ -276,15 +309,6 @@ class TaskInProgress {
this.maxTaskAttempts = conf.getMaxReduceAttempts();
}
}
-
- /**
- * Return the index of the tip within the job, so
- * "task_200707121733_1313_0002_m_012345" would return 12345;
- * @return int the tip index
- */
- public int idWithinJob() {
- return partition;
- }
public boolean isJobCleanupTask() {
return jobCleanup;
@@ -392,19 +416,32 @@ class TaskInProgress {
public JobInProgress getJob() {
return job;
}
+
/**
* Return an ID for this task, not its component taskid-threads
*/
public TaskID getTIPId() {
return this.id;
}
+
/**
- * Whether this is a map task
+ * Whether this is a map task. Note that ubertasks return false here so
+ * they can run in a reduce slot (larger). (Setup and cleanup tasks may
+ * return either true or false.)
*/
public boolean isMapTask() {
- return splitInfo != null;
+ return splitInfo != null && !isUber;
}
-
+
+ /**
+ * Whether this is an ubertask, i.e., a meta-task that contains a handful
+ * of map tasks and (at most) a single reduce task. Note that ubertasks
+ * are seen as reduce tasks in most contexts.
+ */
+ public boolean isUberTask() {
+ return isUber;
+ }
+
/**
* Returns the {@link TaskType} of the {@link TaskAttemptID} passed.
* The type of an attempt is determined by the nature of the task and not its
@@ -929,15 +966,16 @@ class TaskInProgress {
}
/**
- * Get the split locations
+ * Get the split locations
*/
public String[] getSplitLocations() {
+//GRR FIXME? may need to add "( .. || isUberTask())" if ever called for uber (but locations for which split? all of them?)
if (isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo.getLocations();
+ return splitInfo[0].getLocations();
}
return new String[0];
}
-
+
/**
* Get the Status of the tasks managed by this TIP
*/
@@ -1120,7 +1158,7 @@ class TaskInProgress {
//set this the first time we run a taskAttempt in this TIP
//each Task attempt has its own TaskStatus, which tracks that
- //attempts execStartTime, thus this startTime is TIP wide.
+ //attempt's execStartTime, thus this startTime is TIP wide.
if (0 == execStartTime){
setExecStartTime(lastDispatchTime);
}
@@ -1132,8 +1170,9 @@ class TaskInProgress {
}
/**
- * Adds a previously running task to this tip. This is used in case of
- * jobtracker restarts.
+ * Creates a task or recreates a previously running one and adds it to this
+ * tip. The latter is used in case of jobtracker restarts. This is the
+ * ultimate source of Task objects in a normal Hadoop setup.
*/
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
@@ -1147,10 +1186,20 @@ class TaskInProgress {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());
}
- t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
- numSlotsNeeded);
+ t = new MapTask(jobFile, taskid, partition,
+ splitInfo[0].getSplitIndex(), numSlotsNeeded);
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+ if (isUberTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Launching actual UberTask (" + numMaps + " maps, "
+ + numReduces + " reduces)");
+ }
+ // numMaps is implicit in size of splitIndex array:
+ t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(),
+ numReduces, numSlotsNeeded, jobSetupCleanupNeeded);
+ } else {
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+ }
}
if (jobCleanup) {
t.setJobCleanupTask();
@@ -1187,6 +1236,17 @@ class TaskInProgress {
return t;
}
+ // GRR FIXME? more efficient just to pass splitInfo directly...any need for
+ // rest of it in UberTask?
+ TaskSplitIndex[] getSplitIndexArray() {
+ int numSplits = splitInfo.length;
+ TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ splitIndex[i] = splitInfo[i].getSplitIndex();
+ }
+ return splitIndex;
+ }
+
boolean isRunningTask(TaskAttemptID taskid) {
TaskStatus status = taskStatuses.get(taskid);
return status != null && status.getRunState() == TaskStatus.State.RUNNING;
@@ -1236,7 +1296,8 @@ class TaskInProgress {
}
/**
- * Get the id of this map or reduce task.
+ * Get the task index of this map or reduce task. For example,
+ * "task_201011230308_87259_r_000240" would return 240.
* @return The index of this tip in the maps/reduces lists.
*/
public int getIdWithinJob() {
@@ -1256,7 +1317,7 @@ class TaskInProgress {
public int getSuccessEventNumber() {
return successEventNumber;
}
-
+
/**
* Gets the Node list of input split locations sorted in rack order.
*/
@@ -1264,7 +1325,7 @@ class TaskInProgress {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
- String[] splits = splitInfo.getLocations();
+ String[] splits = splitInfo[0].getLocations(); // actually replicas
Node[] nodes = new Node[splits.length];
for (int i = 0; i < splits.length; i++) {
nodes[i] = jobtracker.getNode(splits[i]);
@@ -1293,13 +1354,20 @@ class TaskInProgress {
}
public long getMapInputSize() {
- if(isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo.getInputDataLength();
+ if (isUberTask()) {
+ int numSplits = splitInfo.length;
+ long sumInputDataLength = 0;
+ for (int i = 0; i < numSplits; ++i) {
+ sumInputDataLength += splitInfo[i].getInputDataLength();
+ }
+ return sumInputDataLength;
+ } else if (isMapTask() && !jobSetup && !jobCleanup) {
+ return splitInfo[0].getInputDataLength();
} else {
return 0;
}
}
-
+
/**
* Compare most recent task attempts dispatch time to current system time so
* that task progress rate will slow down as time proceeds even if no progress
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 8 05:54:13 2011
@@ -41,10 +41,15 @@ public abstract class TaskStatus impleme
static final Log LOG =
LogFactory.getLog(TaskStatus.class.getName());
- //enumeration for reporting current phase of a task.
+ // what kind of TaskStatus is it?
@InterfaceAudience.Private
@InterfaceStability.Unstable
- public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+ public static enum Type {MAP, REDUCE, UBER}
+
+ // enumeration for reporting current phase of a task.
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
// what state is the task in?
@InterfaceAudience.Private
@@ -105,6 +110,7 @@ public abstract class TaskStatus impleme
public TaskAttemptID getTaskID() { return taskid; }
public abstract boolean getIsMap();
+ public abstract boolean getIsUber();
public int getNumSlots() {
return numSlots;
}
@@ -495,46 +501,61 @@ public abstract class TaskStatus impleme
//////////////////////////////////////////////////////////////////////////////
// Factory-like methods to create/read/write appropriate TaskStatus objects
//////////////////////////////////////////////////////////////////////////////
-
- static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId,
- float progress, int numSlots,
- State runState, String diagnosticInfo,
- String stateString, String taskTracker,
- Phase phase, Counters counters)
- throws IOException {
- boolean isMap = in.readBoolean();
- return createTaskStatus(isMap, taskId, progress, numSlots, runState,
- diagnosticInfo, stateString, taskTracker, phase,
- counters);
- }
-
+
+ // this is the main one used by TT.TIP, JIP, and (apparently) all the
+ // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus,
+ // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities)
+ // [also formerly MapTask and ReduceTask, but no longer]
static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId,
float progress, int numSlots,
State runState, String diagnosticInfo,
String stateString, String taskTracker,
Phase phase, Counters counters) {
return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState,
- diagnosticInfo, stateString, taskTracker,
+ diagnosticInfo, stateString, taskTracker,
phase, counters) :
new ReduceTaskStatus(taskId, progress, numSlots, runState,
diagnosticInfo, stateString,
taskTracker, phase, counters);
}
-
- static TaskStatus createTaskStatus(boolean isMap) {
- return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
+
+ // used only in default ctors of Task (also formerly MapTask, ReduceTask) and
+ // readTaskStatus() below
+ static TaskStatus createTaskStatus(Type tsType) {
+ return (tsType == TaskStatus.Type.MAP)
+ ? new MapTaskStatus()
+ : (tsType == TaskStatus.Type.REDUCE)
+ ? new ReduceTaskStatus()
+ : new UberTaskStatus();
}
static TaskStatus readTaskStatus(DataInput in) throws IOException {
boolean isMap = in.readBoolean();
- TaskStatus taskStatus = createTaskStatus(isMap);
+ boolean isUber = in.readBoolean();
+ Type tsType = isMap
+ ? TaskStatus.Type.MAP
+ : isUber
+ ? TaskStatus.Type.UBER
+ : TaskStatus.Type.REDUCE;
+ TaskStatus taskStatus = createTaskStatus(tsType);
taskStatus.readFields(in);
return taskStatus;
}
static void writeTaskStatus(DataOutput out, TaskStatus taskStatus)
throws IOException {
+/* LATER
+ * //GRR FIXME: longer-term, just store tsType as member var (but then need
+ * // to modify or add new ctor: used in many places)
+ * Type tsType = taskStatus.getIsUber()
+ * ? TaskStatus.Type.UBER
+ * : taskStatus.getIsMap()
+ * ? TaskStatus.Type.MAP
+ * : TaskStatus.Type.REDUCE;
+ * WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...)
+ */
out.writeBoolean(taskStatus.getIsMap());
+ out.writeBoolean(taskStatus.getIsUber());
taskStatus.write(out);
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:54:13 2011
@@ -827,7 +827,7 @@ public class TaskTracker
f = rjob.getFetchStatus();
for (TaskInProgress tip : rjob.tasks) {
Task task = tip.getTask();
- if (!task.isMapTask()) {
+ if (!task.isMapTask() && !task.isUberTask()) {
if (((ReduceTask)task).getPhase() ==
TaskStatus.Phase.SHUFFLE) {
if (rjob.getFetchStatus() == null) {
@@ -2502,21 +2502,27 @@ public class TaskTracker
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
- taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
- 0.0f,
- task.getNumSlotsRequired(),
- task.getState(),
- diagnosticInfo.toString(),
- "initializing",
- getName(),
- task.isTaskCleanupTask() ?
- TaskStatus.Phase.CLEANUP :
- task.isMapTask()? TaskStatus.Phase.MAP:
- TaskStatus.Phase.SHUFFLE,
- task.getCounters());
+ if (task.isUberTask()) {
+ taskStatus = new UberTaskStatus(
+ task.getTaskID(), 0.0f,
+ task.getNumSlotsRequired(), task.getState(),
+ diagnosticInfo.toString(), "initializing", getName(),
+ TaskStatus.Phase.MAP, task.getCounters());
+ } else {
+ taskStatus = TaskStatus.createTaskStatus(
+ task.isMapTask(), task.getTaskID(), 0.0f,
+ task.getNumSlotsRequired(), task.getState(),
+ diagnosticInfo.toString(), "initializing", getName(),
+ task.isTaskCleanupTask()
+ ? TaskStatus.Phase.CLEANUP
+ : task.isMapTask()
+ ? TaskStatus.Phase.MAP
+ : TaskStatus.Phase.SHUFFLE,
+ task.getCounters());
+ }
taskTimeout = (10 * 60 * 1000);
}
-
+
void localizeTask(Task task) throws IOException{
FileSystem localFs = FileSystem.getLocal(fConf);
@@ -2633,7 +2639,7 @@ public class TaskTracker
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
LOG.info("Not launching task: " + task.getTaskID() +
- " since it's state is " + this.taskStatus.getRunState());
+ " since its state is " + this.taskStatus.getRunState());
}
}
@@ -3206,7 +3212,7 @@ public class TaskTracker
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
if (!jvmManager.isJvmKnown(jvmId)) {
- LOG.info("Killing unknown JVM " + jvmId);
+ LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME: bug? no (apparent) killing going on here...
return new JvmTask(null, true);
}
RunningJob rjob = runningJobs.get(jvmId.getJobId());
@@ -3237,7 +3243,7 @@ public class TaskTracker
public synchronized boolean statusUpdate(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException {
- TaskInProgress tip = tasks.get(taskid);
+ TaskInProgress tip = tasks.get(taskid); // TT.TIP, not TaskInProgress.java
if (tip != null) {
tip.reportProgress(taskStatus);
myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
@@ -3685,17 +3691,15 @@ public class TaskTracker
}
userName = rjob.jobConf.getUser();
}
- // Index file
- Path indexFileName =
- lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- userName, jobId, mapId)
- + "/file.out.index", conf);
-
- // Map-output file
- Path mapOutputFileName =
- lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- userName, jobId, mapId)
- + "/file.out", conf);
+
+ // Map-output filename ("... /file.out")
+ StringBuilder sb = new StringBuilder();
+ sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId))
+ .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING);
+ Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
+ // Index filename ("... /file.out.index")
+ sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
/**
* Read the index file to get the information about where the map-output
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar 8 05:54:13 2011
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.MRJobConfig; // JobContext.SKIP_RECORDS
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.TaskType; // MAP, JOB_SETUP, TASK_CLEANUP...
+import org.apache.hadoop.util.Progress;
+
+class UberTask extends Task {
+ private TaskSplitIndex[] splits;
+ private int numMapTasks;
+ private int numReduceTasks;
+ private boolean jobSetupCleanupNeeded;
+
+ private static final Log LOG = LogFactory.getLog(UberTask.class.getName());
+
+ private Progress[] subPhases; // persistent storage for MapTasks, ReduceTask
+
+ // "instance initializer": executed between Task and UberTask constructors
+ {
+ // cannot call setPhase() here now that createTaskStatus() is called in
+ // Task subclass(es): initializer is executed before subclass ctor =>
+ // taskStatus still null => NPE
+ getProgress().setStatus("uber"); // Task.java: change name of root Progress
+ }
+
+ public UberTask() {
+ super();
+ this.taskStatus = new UberTaskStatus();
+ }
+
+ public UberTask(String jobFile, TaskAttemptID taskId, int partition,
+ TaskSplitIndex[] splits, int numReduceTasks,
+ int numSlotsRequired, boolean jobSetupCleanupNeeded) {
+ super(jobFile, taskId, partition, numSlotsRequired);
+ this.splits = splits;
+ this.numMapTasks = splits.length;
+ this.numReduceTasks = numReduceTasks;
+ this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+ this.taskStatus = new UberTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+ TaskStatus.State.UNASSIGNED,
+ "", "", "", TaskStatus.Phase.MAP,
+ getCounters());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("UberTask " + taskId + " constructed with " + numMapTasks
+ + " sub-maps and " + numReduceTasks + " sub-reduces");
+ }
+ }
+
+ @Override
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
+ throws IOException {
+ return new UberTaskRunner(tip, tracker, conf);
+ }
+
+ /* perhaps someday we'll allow an UberTask to run as either a MapTask or a
+ * ReduceTask, but for now it's the latter only */
+ @Override
+ public boolean isMapTask() {
+ return false;
+ }
+
+ /**
+ * Is this really a combo-task masquerading as a plain ReduceTask? Yup.
+ */
+ @Override
+ public boolean isUberTask() {
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ this.umbilical = umbilical;
+
+ // set up two-level Progress/phase tree: getProgress() is root ("uber"),
+ // and subtasks' "root node" Progress is second level (will override
+ // native one when construct each subtask)
+ subPhases = new Progress[numMapTasks + numReduceTasks];
+ for (int j=0; j < numMapTasks; ++j) {
+ subPhases[j] = getProgress().addPhase("map " + String.valueOf(j+1));
+ }
+ for (int j = numMapTasks; j < numMapTasks + numReduceTasks; ++j) {
+ subPhases[j] =
+ getProgress().addPhase("reduce " + String.valueOf(j - numMapTasks + 1));
+ }
+ // we could set up each subtask's phases, too, but would need to store all
+ // (2*numMapTasks + 2*numReduceTasks) of them here, and subtasks already
+ // have storage allocated (mapPhase, sortPhase, copyPhase, reducePhase) in
+ // MapTask and ReduceTask--instead, will call new accessor for each after
+ // each subtask is created
+
+ // Start thread that will handle communication with parent. Note that this
+ // is NOT the reporter the subtasks will use--we want them to get one that
+ // knows nothing of umbilical, so that calls to it will pass through us,
+ // changing the task ID to our own (UberTask's) before sending progress on
+ // up via this reporter. (No need for the subtask reporter also to adjust
+ // the progress percentage; we get that for free from the phase tree.)
+ TaskReporter reporter = startReporter(umbilical);
+
+ // use context objects API?
+ boolean useNewApi = job.getUseNewMapper(); // "mapred.mapper.new-api"
+ assert useNewApi == job.getUseNewReducer(); // enforce consistency
+
+ // initialize the ubertask (sole "real" task as far as framework is
+ // concerned); this is where setupTask() is called
+ initialize(job, getJobID(), reporter, useNewApi);
+
+ // Generate the map TaskAttemptIDs we need to run.
+ // Regular tasks are handed their TaskAttemptIDs via TaskInProgress's
+ // getTaskToRun() and addRunningTask(), but that approach doesn't work
+ // for us. Ergo, we create our own--technically bypassing the nextTaskId
+ // limits in getTaskToRun(), but since the point of UberTask is to roll
+ // up too-small jobs into a single, more normal-sized ubertask (whose
+ // single TaskAttemptID _is_ subject to the limits), that's reasonable.
+ TaskAttemptID[] mapIds = createMapIds();
+
+ // set up the job
+ if (jobSetupCleanupNeeded) {
+ runSetupJob(reporter);
+ }
+
+ // run the maps
+ if (numMapTasks > 0) {
+ runUberMapTasks(job, mapIds, splits, umbilical, reporter);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("UberTask " + getTaskID() + " has no sub-MapTasks to run");
+ }
+ }
+
+ if (numReduceTasks > 0) {
+ // may make sense to lift this restriction at some point, but for now
+ // code is written to support one at most:
+ if (numReduceTasks > 1) {
+ throw new IOException("UberTask invoked with " + numReduceTasks
+ + " reduces (1 max)");
+ }
+
+ // set up the reduce ...
+ Class keyClass = job.getMapOutputKeyClass();
+ Class valueClass = job.getMapOutputValueClass();
+ RawComparator comparator = job.getOutputValueGroupingComparator();
+
+ // ... then run it (using our own [reduce] TaskAttemptID)
+ runUberReducer(job, getTaskID(), mapIds.length, umbilical, reporter,
+ comparator, keyClass, valueClass);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("UberTask " + getTaskID() + " has no sub-ReduceTasks to run");
+ }
+ }
+
+ // clean up the job (switch phase to "cleanup" and delete staging dir, but
+ // do NOT delete temp dir yet)
+ if (jobSetupCleanupNeeded) {
+ runCommitAbortJob(reporter);
+ }
+
+ // this is where commitTask() (or abortTask()) is called
+ done(umbilical, reporter);
+
+ // now finish cleaning up the job (delete temp dir: results are committed)
+ if (jobSetupCleanupNeeded) {
+ commitJob();
+ }
+ }
+
+ private TaskAttemptID[] createMapIds() {
+ TaskAttemptID[] mapIds = new TaskAttemptID[numMapTasks];
+ // Note that the reducer always starts looking for ID 0 (output/map_0.out),
+ // so it's not possible (or at least not easy) to add an offset to the
+ // mapIds. However, since ~nobody but us ever sees them (thanks to
+ // SubTaskReporter translation below--progress is reported to TT using
+ // UberTask's ID), it's OK to overlap our own task ID and potentially
+ // those of the setup and cleanup tasks.
+ for (int j = 0; j < mapIds.length; ++j) {
+ mapIds[j] = new TaskAttemptID(new TaskID(getJobID(), TaskType.MAP, j), 0);
+ }
+ return mapIds;
+ }
+
+//GRR PERF TODO: make sure not crossing disk boundaries (optimization: just renaming map outputs to reduce inputs without doing in-memory/disk-spill shuffle thing)
+ private void renameMapOutputForReduce(TaskAttemptID mapId,
+ MapOutputFile subMapOutputFile)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf); //PERF FIXME? could pass in
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ mapId.getTaskID(), localFs.getLength(mapOut));
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ }
+
+ private void runSetupJob(TaskReporter reporter)
+ throws IOException, InterruptedException {
+ runJobSetupTask(umbilical, reporter);
+ }
+
+ private void runCommitAbortJob(TaskReporter reporter)
+ throws IOException, InterruptedException {
+ // if we (uber) got this far without _ourselves_ being whacked, then we've
+ // succeeded
+ setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+ runJobCleanupTask(umbilical, reporter);
+ }
+
+ /**
+ * This is basically an uber-specific version of MapTask's run() method.
+ * It loops over the map subtasks sequentially. runUberReducer() (below)
+ * is the corresponding replacement for ReduceTask's run().
+ */
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runUberMapTasks(final JobConf job,
+ final TaskAttemptID[] mapIds,
+ final TaskSplitIndex[] splits,
+ final TaskUmbilicalProtocol umbilical,
+ TaskReporter reporter)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ boolean useNewApi = job.getUseNewMapper(); // use context objects API?
+
+ for (int j=0; j < mapIds.length; ++j) {
+ MapTask map = new MapTask(getJobFile(), mapIds[j], j, splits[j], 1);
+ JobConf localConf = new JobConf(job);
+ map.localizeConfiguration(localConf);
+ map.setConf(localConf);
+ // for reporting purposes (to TT), use our (uber) task ID, not subtask's:
+ map.setTaskIdForUmbilical(getTaskID());
+
+ // override MapTask's "root" Progress node with our second-level one...
+ map.setProgress(subPhases[j]);
+ // ...and add two third-level Progress nodes
+ map.createPhase(TaskStatus.Phase.MAP, "map", 0.667f);
+ map.createPhase(TaskStatus.Phase.SORT, "sort", 0.333f);
+
+ TaskReporter subReporter =
+ new SubTaskReporter(map.getProgress(), reporter, j);
+ map.initialize(localConf, getJobID(), subReporter, useNewApi);
+
+ LOG.info("UberTask " + getTaskID() + " running sub-MapTask " + (j+1)
+ + "/" + numMapTasks);
+
+ if (useNewApi) {
+ MapTask.runNewMapper(map, localConf, splits[j], umbilical, subReporter);
+ } else {
+ MapTask.runOldMapper(map, localConf, splits[j], umbilical, subReporter);
+ }
+ updateCounters(map);
+
+ // Set own progress to 1.0 and move to next sibling node in Progress/phase
+ // tree. NOTE: this works but is slightly fragile. Sibling doesn't
+ // yet exist, but since internal startNextPhase() call merely updates
+ // currentPhase index without "dereferencing" it, this is OK as long as
+ // no one calls phase() on parent Progress (or get()?) in interim.
+ map.getProgress().complete();
+
+ // Signal the communication thread to pass any progress on up to the TT.
+ // (This and the renameMapOutputForReduce() optimization below are the
+ // sole bits of output committer's "commitTask()" method that we actually
+ // want/need, so consider the subtask committed at this point.)
+ reporter.progress();
+
+ // every map will produce file.out (in the same dir), so rename as we go
+ if (numReduceTasks > 0) { //GRR FIXME: is conditionalized approach a behavior change? do map-only jobs produce file.out or map_#.out or nothing at all? if not renamed here, do we suddenly lose data that we used to preserve?
+ renameMapOutputForReduce(mapIds[j], map.getMapOutputFile());
+ }
+ }
+ }
+
+ /**
+ * This is basically an uber-specific version of ReduceTask's run() method.
+ * It currently supports only a single reducer (or none, in the trivial sense
+ * of not being called in that case).
+ */
+ @SuppressWarnings("unchecked")
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runUberReducer(JobConf job, TaskAttemptID reduceId, int numMaps,
+ final TaskUmbilicalProtocol umbilical,
+ final TaskReporter reporter,
+ RawComparator<INKEY> comparator,
+ Class<INKEY> keyClass,
+ Class<INVALUE> valueClass)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ boolean useNewApi = job.getUseNewReducer(); // use context objects API?
+ ReduceTask reduce = new ReduceTask(getJobFile(), reduceId, 0, numMaps, 1);
+ JobConf localConf = new JobConf(job);
+ reduce.localizeConfiguration(localConf);
+ reduce.setConf(localConf);
+ localConf.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
+
+ // override ReduceTask's "root" Progress node with our second-level one...
+ reduce.setProgress( subPhases[numMapTasks+0] );
+ // ...and add two third-level Progress nodes (SHUFFLE/"copy" is unnecessary)
+ reduce.createPhase(TaskStatus.Phase.SORT, "sort");
+ reduce.createPhase(TaskStatus.Phase.REDUCE, "reduce");
+
+ // subtaskIndex of reduce is one bigger than that of last map,
+ // i.e., (numMapTasks-1) + 1
+ TaskReporter subReporter =
+ new SubTaskReporter(reduce.getProgress(), reporter, numMapTasks);
+ reduce.initialize(localConf, getJobID(), subReporter, useNewApi);
+
+ LOG.info("UberTask " + getTaskID() + " running sub-ReduceTask 1/"
+ + numReduceTasks);
+
+ // note that this is implicitly the "isLocal" branch of ReduceTask run():
+ // we don't have a shuffle phase
+ // (=> should skip adding Phase in first place and use 50/50 split? FIXME)
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ RawKeyValueIterator rIter =
+ Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(), null, // no codec
+ ReduceTask.getMapFiles(reduce, rfs, true),
+ !conf.getKeepFailedTaskFiles(),
+ job.getInt(JobContext.IO_SORT_FACTOR, 100),
+ new Path(getTaskID().toString()),
+ job.getOutputKeyComparator(),
+ subReporter, spilledRecordsCounter,
+ null, null); // no writesCounter or mergePhase
+
+ // set progress = 1.0 and move _parent's_ index to next sibling phase:
+ reduce.completePhase(TaskStatus.Phase.SORT); // "sortPhase.complete()"
+ reduce.taskStatus.setPhase(TaskStatus.Phase.REDUCE);
+
+ if (useNewApi) {
+ ReduceTask.runNewReducer(reduce, job, umbilical, subReporter,
+ rIter, comparator, keyClass, valueClass);
+ } else {
+ ReduceTask.runOldReducer(reduce, job, umbilical, subReporter,
+ rIter, comparator, keyClass, valueClass);
+ }
+ updateCounters(reduce);
+
+ // set own progress to 1.0 and move to [nonexistent] next sibling node in
+ // Progress/phase tree; this will cause parent node's progress (UberTask's)
+ // to be set to 1.0, too (at least, assuming all previous siblings have
+ // done so, too...Progress/phase stuff is fragile in more ways than one)
+ reduce.getProgress().complete();
+
+ // signal the communication thread to pass any progress on up to the TT
+ reporter.progress();
+ }
+
+ /**
+ * Updates uber-counters with values from completed subtasks.
+ * @param subtask a map or reduce subtask that has just been successfully
+ * completed
+ */
+ private void updateCounters(Task subtask) {
+ getCounters().incrAllCounters(subtask.getCounters());
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ if (isMapOrReduce()) { //GRR Q: why would this ever NOT be true?
+ out.writeBoolean(jobSetupCleanupNeeded);
+ WritableUtils.writeVInt(out, numMapTasks);
+ WritableUtils.writeVInt(out, numReduceTasks);
+ for (TaskSplitIndex splitIndex : splits) {
+ splitIndex.write(out);
+ }
+ splits = null;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ if (isMapOrReduce()) { //GRR Q: why would this ever NOT be true?
+ jobSetupCleanupNeeded = in.readBoolean();
+ numMapTasks = WritableUtils.readVInt(in);
+ numReduceTasks = WritableUtils.readVInt(in);
+ splits = new TaskSplitIndex[numMapTasks];
+ for (int j=0; j < numMapTasks; ++j) {
+ TaskSplitIndex splitIndex = new TaskSplitIndex();
+ splitIndex.readFields(in);
+ splits[j] = splitIndex;
+ }
+ }
+ }
+
+ /**
+ * In our superclass, the communication thread handles communication with
+ * the parent (TaskTracker) via the umbilical, but that works only if the
+ * TaskTracker is aware of the task's ID--which is true of us (UberTask)
+ * but not our map and reduce subtasks. Ergo, intercept subtasks' progress
+ * reports and pass them on as our own, i.e., use our own uber-taskID in
+ * place of the subtasks' bogus ones. (Leave the progress percentage alone;
+ * the phase/Progress hierarchy we set up in run() and runUber*() will take
+ * care of that.)
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ protected class SubTaskReporter extends Task.TaskReporter
+ implements Runnable, Reporter {
+
+ private Progress taskProgress;
+ private TaskReporter uberReporter;
+ private int subtaskIndex;
+ // subtaskIndex index goes from 0 -> (m-1+r), where m = number of maps and
+ // r = number of reduces. (Latter can be either 0 or 1, and m+r >= 1.)
+
+ SubTaskReporter(Progress progress, TaskReporter reporter, int subtaskIdx) {
+ super(progress, null);
+ this.taskProgress = progress;
+ this.uberReporter = reporter;
+ this.subtaskIndex = subtaskIdx;
+ }
+
+ @Override
+ public void run() {
+ // make sure this never gets called...
+ LOG.fatal("UberTask " + getTaskID() + " SubTaskReporter run() called "
+ + "unexpectedly for subtask index " + subtaskIndex);
+ assert "uh oh: SubTaskReporter's run() method was called!".isEmpty();
+ }
+
+ // just one (real) intercepted method
+ @Override
+ public void setProgress(float progress) {
+ // update _our_ taskProgress [no need to do uber's, too: ultimately does
+ // get() on uber's taskProgress], but set _uberReporter's_ progress flag
+ taskProgress.phase().set(progress);
+ uberReporter.setProgressFlag();
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java Tue Mar 8 05:54:13 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.log4j.Level;
+
+public class UberTaskRunner extends TaskRunner {
+
+ public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf) {
+ super(tip, tracker, conf);
+ }
+
+ @Override
+ public String getChildJavaOpts(JobConf jobConf, String defaultValue) {
+ return jobConf.get(JobConf.MAPRED_UBER_TASK_JAVA_OPTS,
+ super.getChildJavaOpts(jobConf,
+ JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+ }
+
+ @Override
+ public int getChildUlimit(JobConf jobConf) {
+ return jobConf.getInt(JobConf.MAPRED_UBER_TASK_ULIMIT,
+ super.getChildUlimit(jobConf));
+ }
+
+ @Override
+ public String getChildEnv(JobConf jobConf) {
+ return jobConf.get(JobConf.MAPRED_UBER_TASK_ENV,
+ super.getChildEnv(jobConf));
+ }
+
+ @Override
+ public Level getLogLevel(JobConf jobConf) {
+ //GRR FIXME? what if != JobConf.MAPRED_MAP_TASK_LOG_LEVEL ?
+ return Level.toLevel(jobConf.get(JobConf.MAPRED_REDUCE_TASK_LOG_LEVEL,
+ JobConf.DEFAULT_LOG_LEVEL.toString()));
+ }
+
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java?rev=1079193&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskStatus.java Tue Mar 8 05:54:13 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+
+class UberTaskStatus extends TaskStatus {
+
+ private long mapFinishTime;
+ // map version of sortFinishTime is ~ irrelevant
+
+ private long shuffleFinishTime;
+ private long sortFinishTime;
+ private List<TaskAttemptID> failedFetchTasks = new ArrayList<TaskAttemptID>(1);
+
+ public UberTaskStatus() {}
+
+ public UberTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
+ State runState, String diagnosticInfo,
+ String stateString, String taskTracker, Phase phase,
+ Counters counters) {
+ super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
+ taskTracker, phase, counters);
+ }
+
+ @Override
+ public Object clone() {
+ UberTaskStatus myClone = (UberTaskStatus)super.clone();
+ myClone.failedFetchTasks = new ArrayList<TaskAttemptID>(failedFetchTasks);
+ return myClone;
+ }
+
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+
+ @Override
+ public boolean getIsUber() {
+ return true;
+ }
+
+ @Override
+ void setFinishTime(long finishTime) {
+ if (mapFinishTime == 0) {
+ this.mapFinishTime = finishTime;
+ }
+ if (shuffleFinishTime == 0) {
+ this.shuffleFinishTime = finishTime;
+ }
+ if (sortFinishTime == 0){
+ this.sortFinishTime = finishTime;
+ }
+ super.setFinishTime(finishTime);
+ }
+
+ @Override
+ public long getMapFinishTime() {
+ return mapFinishTime;
+ }
+
+ @Override
+ void setMapFinishTime(long mapFinishTime) {
+ this.mapFinishTime = mapFinishTime;
+ }
+
+ @Override
+ public long getShuffleFinishTime() {
+ return shuffleFinishTime;
+ }
+
+ @Override
+ void setShuffleFinishTime(long shuffleFinishTime) {
+ if (mapFinishTime == 0) {
+ this.mapFinishTime = shuffleFinishTime;
+ }
+ this.shuffleFinishTime = shuffleFinishTime;
+ }
+
+ @Override
+ public long getSortFinishTime() {
+ return sortFinishTime;
+ }
+
+ @Override
+ void setSortFinishTime(long sortFinishTime) {
+ if (mapFinishTime == 0) {
+ this.mapFinishTime = sortFinishTime;
+ }
+ if (shuffleFinishTime == 0) {
+ this.shuffleFinishTime = sortFinishTime;
+ }
+ this.sortFinishTime = sortFinishTime;
+ }
+
+ @Override
+ public List<TaskAttemptID> getFetchFailedMaps() {
+ return failedFetchTasks;
+ }
+
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ failedFetchTasks.add(mapTaskId);
+ }
+
+ @Override
+ synchronized void statusUpdate(TaskStatus status) {
+ super.statusUpdate(status);
+
+ if (status.getIsMap()) { // status came from a sub-MapTask
+ if (status.getMapFinishTime() != 0) {
+ this.mapFinishTime = status.getMapFinishTime();
+ }
+
+ } else { // status came from a sub-ReduceTask
+ if (status.getShuffleFinishTime() != 0) {
+ this.shuffleFinishTime = status.getShuffleFinishTime();
+ }
+
+ if (status.getSortFinishTime() != 0) {
+ sortFinishTime = status.getSortFinishTime();
+ }
+
+ List<TaskAttemptID> newFetchFailedMaps = status.getFetchFailedMaps();
+ if (failedFetchTasks == null) {
+ failedFetchTasks = newFetchFailedMaps;
+ } else if (newFetchFailedMaps != null){
+ failedFetchTasks.addAll(newFetchFailedMaps);
+ }
+ }
+ }
+
+ @Override
+ synchronized void clearStatus() {
+ super.clearStatus();
+ failedFetchTasks.clear();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ mapFinishTime = in.readLong();
+ shuffleFinishTime = in.readLong();
+ sortFinishTime = in.readLong();
+ int numFailedFetchTasks = in.readInt();
+ failedFetchTasks = new ArrayList<TaskAttemptID>(numFailedFetchTasks);
+ for (int i=0; i < numFailedFetchTasks; ++i) {
+ TaskAttemptID taskId = new TaskAttemptID();
+ taskId.readFields(in);
+ failedFetchTasks.add(taskId);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(mapFinishTime);
+ out.writeLong(shuffleFinishTime);
+ out.writeLong(sortFinishTime);
+ out.writeInt(failedFetchTasks.size());
+ for (TaskAttemptID taskId : failedFetchTasks) {
+ taskId.write(out);
+ }
+ }
+
+}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar 8 05:54:13 2011
@@ -271,4 +271,19 @@ public interface MRJobConfig {
"mapreduce.job.submithostname";
public static final String JOB_SUBMITHOSTADDR =
"mapreduce.job.submithostaddress";
+
+ public static final String JOB_UBERTASK_ENABLE =
+ "mapreduce.job.ubertask.enable";
+ public static final String JOB_UBERTASK_MAXMAPS =
+ "mapreduce.job.ubertask.maxmaps";
+ public static final String JOB_UBERTASK_MAXREDUCES =
+ "mapreduce.job.ubertask.maxreduces";
+ public static final String JOB_UBERTASK_MAXBYTES =
+ "mapreduce.job.ubertask.maxbytes";
+ public static final String UBERTASK_JAVA_OPTS =
+ "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
+ public static final String UBERTASK_ULIMIT =
+ "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
+ public static final String UBERTASK_ENV =
+ "mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar 8 05:54:13 2011
@@ -64,6 +64,9 @@
{"name": "launchTime", "type": "long"},
{"name": "totalMaps", "type": "int"},
{"name": "totalReduces", "type": "int"},
+ {"name": "isUber", "type": "boolean"},
+ {"name": "numUberSubMaps", "type": "int"},
+ {"name": "numUberSubReduces", "type": "int"},
{"name": "jobStatus", "type": "string"}
]
},
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079193&r1=1079192&r2=1079193&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar 8 05:54:13 2011
@@ -526,6 +526,8 @@ public class HistoryViewer {
int totalReduces = 0;
int totalCleanups = 0;
int totalSetups = 0;
+ int numUberSubMaps = 0;
+ int numUberSubReduces = 0;
int numFailedMaps = 0;
int numKilledMaps = 0;
int numFailedReduces = 0;
@@ -544,15 +546,20 @@ public class HistoryViewer {
long cleanupFinished = 0;
long setupStarted = 0;
long setupFinished = 0;
-
+ boolean isUber = false;
+
/** Get total maps */
public int getTotalMaps() { return totalMaps; }
/** Get total reduces */
public int getTotalReduces() { return totalReduces; }
- /** Get number of clean up tasks */
+ /** Get number of cleanup tasks */
public int getTotalCleanups() { return totalCleanups; }
- /** Get number of set up tasks */
+ /** Get number of setup tasks */
public int getTotalSetups() { return totalSetups; }
+ /** Get number of map subtasks within UberTask */
+ public int getNumUberSubMaps() { return numUberSubMaps; }
+ /** Get number of reduce subtasks within UberTask */
+ public int getNumUberSubReduces() { return numUberSubReduces; }
/** Get number of failed maps */
public int getNumFailedMaps() { return numFailedMaps; }
/** Get number of killed maps */
@@ -567,11 +574,11 @@ public class HistoryViewer {
public int getNumFailedCleanups() { return numFailedCleanups; }
/** Get number of killed cleanup tasks */
public int getNumKilledCleanups() { return numKilledCleanups; }
- /** Get number of finished set up tasks */
+ /** Get number of finished setup tasks */
public int getNumFinishedSetups() { return numFinishedSetups; }
- /** Get number of failed set up tasks */
+ /** Get number of failed setup tasks */
public int getNumFailedSetups() { return numFailedSetups; }
- /** Get number of killed set up tasks */
+ /** Get number of killed setup tasks */
public int getNumKilledSetups() { return numKilledSetups; }
/** Get number of maps that were started */
public long getMapStarted() { return mapStarted; }
@@ -589,8 +596,10 @@ public class HistoryViewer {
public long getSetupStarted() { return setupStarted; }
/** Get number of setup tasks that finished */
public long getSetupFinished() { return setupFinished; }
+ /** Get job's UberTask/non-UberTask status */
+ public boolean isUber() { return isUber; }
- /** Create summary information for the parsed job */
+ /** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
tasks = job.getAllTasks();
|