Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar 8 05:53:52 2011
@@ -72,10 +72,8 @@ public class ReduceTask extends Task {
private CompressionCodec codec;
-
{
getProgress().setStatus("reduce");
- setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
private Progress copyPhase;
@@ -121,14 +119,21 @@ public class ReduceTask extends Task {
public ReduceTask() {
super();
+ this.taskStatus = new ReduceTaskStatus();
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
int partition, int numMaps, int numSlotsRequired) {
super(jobFile, taskId, partition, numSlotsRequired);
this.numMaps = numMaps;
+/*
+ */
+ this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
+ TaskStatus.State.UNASSIGNED,
+ "", "", "", TaskStatus.Phase.SHUFFLE,
+ getCounters());
}
-
+
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
@@ -151,6 +156,45 @@ public class ReduceTask extends Task {
return false;
}
+ /**
+ * Is this really a combo-task masquerading as a plain MapTask? Decidedly
+ * not.
+ */
+ @Override
+ public boolean isUberTask() {
+ return false;
+ }
+
+ /**
+ * Allow UberTask (or, potentially, JobInProgress or others) to set up a
+ * deeper Progress hierarchy even if run() is skipped. If setProgress()
+ * is also needed, it should be called <I>before</I> createPhase() or else
+ * the sub-phases created here will be wiped out.
+ */
+ void createPhase(TaskStatus.Phase phaseType, String status) {
+ if (phaseType == TaskStatus.Phase.SHUFFLE) {
+ copyPhase = getProgress().addPhase(status);
+ } else if (phaseType == TaskStatus.Phase.SORT) {
+ sortPhase = getProgress().addPhase(status);
+ } else /* TaskStatus.Phase.REDUCE */ {
+ reducePhase = getProgress().addPhase(status);
+ }
+ }
+
+ /**
+ * Allow UberTask to traverse the deeper Progress hierarchy in case run() is
+ * skipped.
+ */
+ void completePhase(TaskStatus.Phase phaseType) {
+ if (phaseType == TaskStatus.Phase.SHUFFLE) {
+ copyPhase.complete();
+ } else if (phaseType == TaskStatus.Phase.SORT) {
+ sortPhase.complete();
+ } else /* TaskStatus.Phase.REDUCE */ {
+ reducePhase.complete();
+ }
+ }
+
public int getNumMaps() { return numMaps; }
/**
@@ -177,37 +221,39 @@ public class ReduceTask extends Task {
}
// Get the input files for the reducer.
- private Path[] getMapFiles(FileSystem fs, boolean isLocal)
+ static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal)
throws IOException {
List<Path> fileList = new ArrayList<Path>();
if (isLocal) {
// for local jobs
- for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i));
+ for (int i = 0; i < reduce.numMaps; ++i) {
+ fileList.add(reduce.mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
- for (FileStatus filestatus : mapOutputFilesOnDisk) {
+ for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) {
fileList.add(filestatus.getPath());
}
}
return fileList.toArray(new Path[0]);
}
- private class ReduceValuesIterator<KEY,VALUE>
+ private static class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
- public ReduceValuesIterator (RawKeyValueIterator in,
+ ReduceTask reduce;
+ public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
+ this.reduce = reduce;
}
@Override
public VALUE next() {
- reduceInputValueCounter.increment(1);
+ reduce.reduceInputValueCounter.increment(1);
return moveToNext();
}
@@ -216,12 +262,13 @@ public class ReduceTask extends Task {
}
public void informReduceProgress() {
- reducePhase.set(super.in.getProgress().getProgress()); // update progress
+ // update progress:
+ reduce.reducePhase.set(super.in.getProgress().getProgress());
reporter.progress();
}
}
- private class SkippingReduceValuesIterator<KEY,VALUE>
+ private static class SkippingReduceValuesIterator<KEY,VALUE>
extends ReduceValuesIterator<KEY,VALUE> {
private SkipRangeIterator skipIt;
private TaskUmbilicalProtocol umbilical;
@@ -234,26 +281,27 @@ public class ReduceTask extends Task {
private boolean toWriteSkipRecs;
private boolean hasNext;
private TaskReporter reporter;
-
- public SkippingReduceValuesIterator(RawKeyValueIterator in,
+
+ public SkippingReduceValuesIterator(ReduceTask reduce,
+ RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
- super(in, comparator, keyClass, valClass, conf, reporter);
+ super(reduce, in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
- this.skipGroupCounter =
+ this.skipGroupCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
- this.skipRecCounter =
+ this.skipRecCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
- this.toWriteSkipRecs = toWriteSkipRecs() &&
+ this.toWriteSkipRecs = reduce.toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
- skipIt = getSkipRanges().skipRangeIterator();
+ skipIt = reduce.getSkipRanges().skipRangeIterator();
mayBeSkip();
}
-
+
public void nextKey() throws IOException {
super.nextKey();
mayBeSkip();
@@ -292,16 +340,16 @@ public class ReduceTask extends Task {
}
skipGroupCounter.increment(skip);
skipRecCounter.increment(skipRec);
- reportNextRecordRange(umbilical, grpIndex);
+ reduce.reportNextRecordRange(umbilical, grpIndex);
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(KEY key, VALUE value) throws IOException{
if(skipWriter==null) {
- Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
- Path skipFile = new Path(skipDir, getTaskID().toString());
+ Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf);
+ Path skipFile = new Path(skipDir, reduce.getTaskID().toString());
skipWriter = SequenceFile.createWriter(
- skipFile.getFileSystem(conf), conf, skipFile,
+ skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile,
keyClass, valClass,
CompressionType.BLOCK, reporter);
}
@@ -363,8 +411,8 @@ public class ReduceTask extends Task {
} else {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec,
- getMapFiles(rfs, true),
+ job.getMapOutputValueClass(), codec,
+ getMapFiles(this, rfs, true),
!conf.getKeepFailedTaskFiles(),
job.getInt(JobContext.IO_SORT_FACTOR, 100),
new Path(getTaskID().toString()),
@@ -382,18 +430,40 @@ public class ReduceTask extends Task {
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
- runNewReducer(job, umbilical, reporter, rIter, comparator,
+ runNewReducer(this, job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
- runOldReducer(job, umbilical, reporter, rIter, comparator,
+ runOldReducer(this, job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
+ private static class WrappedOutputCollector<OUTKEY, OUTVALUE>
+ implements OutputCollector<OUTKEY, OUTVALUE> {
+ RecordWriter<OUTKEY, OUTVALUE> out;
+ TaskReporter reporter;
+ Counters.Counter reduceOutputCounter;
+ public WrappedOutputCollector(ReduceTask reduce,
+ RecordWriter<OUTKEY, OUTVALUE> out,
+ TaskReporter reporter) {
+ this.out = out;
+ this.reporter = reporter;
+ this.reduceOutputCounter = reduce.reduceOutputCounter;
+ }
+
+ public void collect(OUTKEY key, OUTVALUE value)
+ throws IOException {
+ out.write(key, value);
+ reduceOutputCounter.increment(1);
+ // indicate that progress update needs to be sent
+ reporter.progress();
+ }
+ }
+
@SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runOldReducer(JobConf job,
+ static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runOldReducer(ReduceTask reduce, JobConf job,
TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -403,7 +473,7 @@ public class ReduceTask extends Task {
Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
// make output collector
- String finalName = getOutputName(getPartition());
+ String finalName = getOutputName(reduce.getPartition());
FileSystem fs = FileSystem.get(job);
@@ -411,15 +481,7 @@ public class ReduceTask extends Task {
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector<OUTKEY,OUTVALUE> collector =
- new OutputCollector<OUTKEY,OUTVALUE>() {
- public void collect(OUTKEY key, OUTVALUE value)
- throws IOException {
- out.write(key, value);
- reduceOutputCounter.increment(1);
- // indicate that progress update needs to be sent
- reporter.progress();
- }
- };
+ new WrappedOutputCollector<OUTKEY, OUTVALUE>(reduce, out, reporter);
// apply reduce function
try {
@@ -427,16 +489,16 @@ public class ReduceTask extends Task {
boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
SkipBadRecords.getAutoIncrReducerProcCount(job);
- ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ?
- new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter,
+ ReduceValuesIterator<INKEY,INVALUE> values = reduce.isSkipping() ?
+ new SkippingReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
- new ReduceValuesIterator<INKEY,INVALUE>(rIter,
+ new ReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
- reduceInputKeyCounter.increment(1);
+ reduce.reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
@@ -449,7 +511,7 @@ public class ReduceTask extends Task {
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
- //End of clean up.
+ //End of cleanup.
} catch (IOException ioe) {
try {
reducer.close();
@@ -487,9 +549,38 @@ public class ReduceTask extends Task {
}
}
+ private static class WrappedRawKeyValueIterator implements RawKeyValueIterator {
+ ReduceTask reduce;
+ TaskReporter reporter;
+ RawKeyValueIterator rawIter;
+ public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter,
+ RawKeyValueIterator rawIter) {
+ this.reduce = reduce;
+ this.rawIter = rawIter;
+ this.reporter = reporter;
+ }
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ }
+
@SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runNewReducer(JobConf job,
+ static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runNewReducer(final ReduceTask reduce, JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -498,49 +589,29 @@ public class ReduceTask extends Task {
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
+ org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID();
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
- rIter = new RawKeyValueIterator() {
- public void close() throws IOException {
- rawIter.close();
- }
- public DataInputBuffer getKey() throws IOException {
- return rawIter.getKey();
- }
- public Progress getProgress() {
- return rawIter.getProgress();
- }
- public DataInputBuffer getValue() throws IOException {
- return rawIter.getValue();
- }
- public boolean next() throws IOException {
- boolean ret = rawIter.next();
- reporter.setProgress(rawIter.getProgress().getProgress());
- return ret;
- }
- };
+ rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter);
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
+ new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
- outputFormat.getRecordWriter(taskContext);
+ reduce.outputFormat.getRecordWriter(taskContext);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
- new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
- job.setBoolean("mapred.skip.on", isSkipping());
- job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
- org.apache.hadoop.mapreduce.Reducer.Context
- reducerContext = createReduceContext(reducer, job, getTaskID(),
- rIter, reduceInputKeyCounter,
- reduceInputValueCounter,
- trackedRW,
- committer,
- reporter, comparator, keyClass,
- valueClass);
+ new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduce.reduceOutputCounter);
+ job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping());
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(reducer, job, reduceId, rIter,
+ reduce.reduceInputKeyCounter,
+ reduce.reduceInputValueCounter,
+ trackedRW, reduce.committer, reporter,
+ comparator, keyClass, valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar 8 05:53:52 2011
@@ -54,6 +54,11 @@ class ReduceTaskStatus extends TaskStatu
}
@Override
+ public boolean getIsUber() {
+ return false;
+ }
+
+ @Override
void setFinishTime(long finishTime) {
if (shuffleFinishTime == 0) {
this.shuffleFinishTime = finishTime;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:53:52 2011
@@ -118,6 +118,7 @@ abstract public class Task implements Wr
private String jobFile; // job configuration file
private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id
+ private TaskAttemptID taskIdForUmbilical; // same, or uber's if subtask
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
protected JobStatus.State jobRunStateForCleanup;
@@ -159,8 +160,8 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public Task() {
- taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
+ taskIdForUmbilical = taskId;
spilledRecordsCounter =
counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter =
@@ -174,17 +175,10 @@ abstract public class Task implements Wr
int numSlotsRequired) {
this.jobFile = jobFile;
this.taskId = taskId;
+ this.taskIdForUmbilical = taskId;
this.partition = partition;
this.numSlotsRequired = numSlotsRequired;
- this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
- 0.0f, numSlotsRequired,
- TaskStatus.State.UNASSIGNED,
- "", "", "",
- isMapTask() ?
- TaskStatus.Phase.MAP :
- TaskStatus.Phase.SHUFFLE,
- counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
@@ -197,7 +191,12 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
+
public TaskAttemptID getTaskID() { return taskId; }
+ public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) {
+ this.taskIdForUmbilical = taskIdForUmbilical;
+ }
+
public int getNumSlotsRequired() {
return numSlotsRequired;
}
@@ -269,7 +268,7 @@ abstract public class Task implements Wr
/**
* Report a fatal error to the parent (task) tracker.
*/
- protected void reportFatalError(TaskAttemptID id, Throwable throwable,
+ protected void reportFatalError(Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
Throwable tCause = throwable.getCause();
@@ -277,7 +276,7 @@ abstract public class Task implements Wr
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
- umbilical.fatalError(id, cause);
+ umbilical.fatalError(taskIdForUmbilical, cause);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
@@ -389,6 +388,14 @@ abstract public class Task implements Wr
this.user = user;
}
+ /**
+ * Return the task's MapOutputFile instance.
+ * @return the task's MapOutputFile instance
+ */
+ MapOutputFile getMapOutputFile() {
+ return mapOutputFile;
+ }
+
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -396,6 +403,7 @@ abstract public class Task implements Wr
public void write(DataOutput out) throws IOException {
Text.writeString(out, jobFile);
taskId.write(out);
+ taskIdForUmbilical.write(out);
out.writeInt(partition);
out.writeInt(numSlotsRequired);
taskStatus.write(out);
@@ -414,6 +422,7 @@ abstract public class Task implements Wr
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
+ taskIdForUmbilical = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
@@ -465,7 +474,7 @@ abstract public class Task implements Wr
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
- private transient Progress taskProgress = new Progress();
+ private transient Progress taskProgress = new Progress(); //GRR Q: why transient? lose entire tree every time serialize??
// Current counters
private transient Counters counters = new Counters();
@@ -475,6 +484,21 @@ abstract public class Task implements Wr
public abstract boolean isMapTask();
+ /**
+ * Is this really a combo-task masquerading as a plain MapTask?
+ */
+ public abstract boolean isUberTask();
+
+ /**
+ * This setter allows one to incorporate Tasks into multiple levels of
+ * a Progress addPhase()-generated hierarchy (i.e., not always the root
+ * node), which in turn allows Progress to handle all details of progress
+ * aggregation for an UberTask or even a whole job.
+ */
+ protected void setProgress(Progress progress) {
+ taskProgress = progress;
+ }
+
public Progress getProgress() { return taskProgress; }
public void initialize(JobConf job, JobID id,
@@ -518,7 +542,7 @@ abstract public class Task implements Wr
resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
}
}
-
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
protected class TaskReporter
@@ -561,6 +585,7 @@ abstract public class Task implements Wr
// indicate that progress update needs to be sent
setProgressFlag();
}
+ // FIXME? why isn't this deprecated in favor of public setProgressFlag()?
public void progress() {
// indicate that progress update needs to be sent
setProgressFlag();
@@ -603,15 +628,16 @@ abstract public class Task implements Wr
}
public InputSplit getInputSplit() throws UnsupportedOperationException {
if (split == null) {
- throw new UnsupportedOperationException("Input only available on map");
+ throw new UnsupportedOperationException("Input available only on map");
} else {
return split;
}
}
/**
- * The communication thread handles communication with the parent (Task Tracker).
- * It sends progress updates if progress has been made or if the task needs to
- * let the parent know that it's alive. It also pings the parent to see if it's alive.
+ * The communication thread handles communication with the parent
+ * (TaskTracker). It sends progress updates if progress has been made or
+ * if the task needs to let the parent know that it's alive. It also pings
+ * the parent to see if it's alive.
*/
public void run() {
final int MAX_RETRIES = 3;
@@ -647,8 +673,8 @@ abstract public class Task implements Wr
taskFound = umbilical.ping(taskId);
}
- // if Task Tracker is not aware of our task ID (probably because it died and
- // came back up), kill ourselves
+ // if TaskTracker is not aware of our task ID (probably because it
+ // died and came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
@@ -682,7 +708,7 @@ abstract public class Task implements Wr
}
}
}
-
+
/**
* Reports the next executing record range to TaskTracker.
*
@@ -701,7 +727,7 @@ abstract public class Task implements Wr
if (LOG.isDebugEnabled()) {
LOG.debug("sending reportNextRecordRange " + range);
}
- umbilical.reportNextRecordRange(taskId, range);
+ umbilical.reportNextRecordRange(taskIdForUmbilical, range);
}
/**
@@ -837,8 +863,13 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- LOG.info("Task:" + taskId + " is done."
- + " And is in the process of commiting");
+ if (isUberTask()) {
+ LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId
+ + "is done and is in the process of committing.");
+ } else {
+ LOG.info("Task:" + taskId
+ + "is done and is in the process of committing.");
+ }
updateCounters();
boolean commitRequired = isCommitRequired();
@@ -848,7 +879,7 @@ abstract public class Task implements Wr
// say the task tracker that task is commit pending
while (true) {
try {
- umbilical.commitPending(taskId, taskStatus);
+ umbilical.commitPending(taskIdForUmbilical, taskStatus);
break;
} catch (InterruptedException ie) {
// ignore
@@ -899,8 +930,14 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
- LOG.warn("Parent died. Exiting "+taskId);
+ //GRR FIXME (later): alternatives to taskIdForUmbilical would be
+ // (1) include taskId as part of umbilical object and protocol;
+ // (2) include taskId as part of taskStatus
+ // (3) extend TaskAttemptID (or create related Task inner class?) to
+ // include taskAttemptId() and taskAtteptIdForUmbilical() method
+ // that's overridden in uber context [Dick]
+ if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) {
+ LOG.warn("Parent died. Exiting " + taskId);
System.exit(66);
}
taskStatus.clearStatus();
@@ -936,7 +973,7 @@ abstract public class Task implements Wr
* @return -1 if it can't be found.
*/
private long calculateOutputSize() throws IOException {
- if (!isMapOrReduce()) {
+ if (!isMapOrReduce() || isUberTask()) {
return -1;
}
@@ -956,8 +993,13 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.done(getTaskID());
- LOG.info("Task '" + taskId + "' done.");
+ umbilical.done(taskIdForUmbilical);
+ if (isUberTask()) {
+ LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId
+ + "' done.");
+ } else {
+ LOG.info("Task '" + taskId + "' done.");
+ }
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
@@ -976,7 +1018,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- while (!umbilical.canCommit(taskId)) {
+ while (!umbilical.canCommit(taskIdForUmbilical)) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
@@ -1033,7 +1075,7 @@ abstract public class Task implements Wr
setPhase(TaskStatus.Phase.CLEANUP);
getProgress().setStatus("cleanup");
statusUpdate(umbilical);
- LOG.info("Runnning cleanup for the task");
+ LOG.info("Running cleanup for the task");
// do the cleanup
committer.abortTask(taskContext);
}
@@ -1058,8 +1100,10 @@ abstract public class Task implements Wr
oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
}
} else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
- LOG.info("Committing job");
- committer.commitJob(jobContext);
+ // delete <outputdir>/_temporary and optionally create _SUCCESS file
+ if (!isUberTask()) { // defer since output files have not yet been saved
+ commitJob();
+ }
} else {
throw new IOException("Invalid state of the job for cleanup. State found "
+ jobRunStateForCleanup + " expecting "
@@ -1068,20 +1112,31 @@ abstract public class Task implements Wr
+ JobStatus.State.KILLED);
}
- // delete the staging area for the job
+ // delete the staging area for the job (e.g.,
+ // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/
+ // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir)
JobConf conf = new JobConf(jobContext.getConfiguration());
if (!supportIsolationRunner(conf)) {
- String jobTempDir = conf.get("mapreduce.job.dir");
- Path jobTempDirPath = new Path(jobTempDir);
- FileSystem fs = jobTempDirPath.getFileSystem(conf);
- fs.delete(jobTempDirPath, true);
+ String jobStagingDir = conf.get("mapreduce.job.dir");
+ Path jobStagingDirPath = new Path(jobStagingDir);
+ FileSystem fs = jobStagingDirPath.getFileSystem(conf);
+ fs.delete(jobStagingDirPath, true);
+ }
+ // update counters, save any pending output files, shut down the progress-
+ // reporter communication thread and the umbilical, and mark the task done
+ if (!isUberTask()) { // defer so UberTask can send TT final update(s)
+ done(umbilical, reporter);
}
- done(umbilical, reporter);
}
-
+
+ protected void commitJob() throws IOException {
+ LOG.info("Committing job");
+ committer.commitJob(jobContext);
+ }
+
protected boolean supportIsolationRunner(JobConf conf) {
- return (conf.getKeepTaskFilesPattern() != null || conf
- .getKeepFailedTaskFiles());
+ return (conf.getKeepTaskFilesPattern() != null ||
+ conf.getKeepFailedTaskFiles());
}
protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
@@ -1090,9 +1145,12 @@ abstract public class Task implements Wr
// do the setup
getProgress().setStatus("setup");
committer.setupJob(jobContext);
- done(umbilical, reporter);
+ if (!isUberTask()) {
+ // UberTask calls done() directly; don't shut down umbilical prematurely
+ done(umbilical, reporter);
+ }
}
-
+
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar 8 05:53:52 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.Node;
@@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node;
* **************************************************************
*/
class TaskInProgress {
- static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
+ static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
int maxTaskAttempts = 4;
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
@@ -75,14 +75,19 @@ class TaskInProgress {
// Defines the TIP
private String jobFile = null;
- private TaskSplitMetaInfo splitInfo;
+ private TaskSplitMetaInfo[] splitInfo;
private int numMaps;
+ private int numReduces;
private int partition;
private JobTracker jobtracker;
private JobHistory jobHistory;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
+ private boolean jobCleanup = false;
+ private boolean jobSetup = false;
+ private boolean isUber = false;
+ private boolean jobSetupCleanupNeeded = false; // UberTasks only
// Status of the TIP
private int successEventNumber = -1;
@@ -100,8 +105,6 @@ class TaskInProgress {
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
- private boolean jobCleanup = false;
- private boolean jobSetup = false;
private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
@@ -169,7 +172,8 @@ class TaskInProgress {
JobInProgress job, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
- this.splitInfo = split;
+ this.splitInfo = new TaskSplitMetaInfo[1];
+ this.splitInfo[0] = split;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
@@ -183,7 +187,7 @@ class TaskInProgress {
}
this.user = job.getUser();
}
-
+
/**
* Constructor for ReduceTask
*/
@@ -265,7 +269,36 @@ class TaskInProgress {
}
}
}
-
+
+ /**
+ * Constructor for UberTask
+ */
+ public TaskInProgress(JobID jobid, String jobFile,
+ TaskSplitMetaInfo[] splits,
+ int numMaps, int numReduces,
+ int partition, JobTracker jobtracker, JobConf conf,
+ JobInProgress job, int numSlotsRequired,
+ boolean jobSetupCleanupNeeded) {
+ this.isUber = true;
+ this.jobFile = jobFile;
+ this.splitInfo = splits;
+ this.numMaps = numMaps;
+ this.numReduces = numReduces;
+ this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
+ this.jobtracker = jobtracker;
+ this.job = job;
+ this.conf = conf;
+ this.partition = partition;
+ this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+ this.numSlotsRequired = numSlotsRequired;
+ setMaxTaskAttempts();
+ init(jobid);
+ if (jobtracker != null) {
+ this.jobHistory = jobtracker.getJobHistory();
+ }
+ this.user = job.getUser();
+ }
+
/**
* Set the max number of attempts before we declare a TIP as "failed"
*/
@@ -276,15 +309,6 @@ class TaskInProgress {
this.maxTaskAttempts = conf.getMaxReduceAttempts();
}
}
-
- /**
- * Return the index of the tip within the job, so
- * "task_200707121733_1313_0002_m_012345" would return 12345;
- * @return int the tip index
- */
- public int idWithinJob() {
- return partition;
- }
public boolean isJobCleanupTask() {
return jobCleanup;
@@ -392,19 +416,32 @@ class TaskInProgress {
public JobInProgress getJob() {
return job;
}
+
/**
* Return an ID for this task, not its component taskid-threads
*/
public TaskID getTIPId() {
return this.id;
}
+
/**
- * Whether this is a map task
+ * Whether this is a map task. Note that ubertasks return false here so
+ * they can run in a reduce slot (larger). (Setup and cleanup tasks may
+ * return either true or false.)
*/
public boolean isMapTask() {
- return splitInfo != null;
+ return splitInfo != null && !isUber;
}
-
+
+ /**
+ * Whether this is an ubertask, i.e., a meta-task that contains a handful
+ * of map tasks and (at most) a single reduce task. Note that ubertasks
+ * are seen as reduce tasks in most contexts.
+ */
+ public boolean isUberTask() {
+ return isUber;
+ }
+
/**
* Returns the {@link TaskType} of the {@link TaskAttemptID} passed.
* The type of an attempt is determined by the nature of the task and not its
@@ -929,15 +966,16 @@ class TaskInProgress {
}
/**
- * Get the split locations
+ * Get the split locations
*/
public String[] getSplitLocations() {
+//GRR FIXME? may need to add "( .. || isUberTask())" if ever called for uber (but locations for which split? all of them?)
if (isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo.getLocations();
+ return splitInfo[0].getLocations();
}
return new String[0];
}
-
+
/**
* Get the Status of the tasks managed by this TIP
*/
@@ -1120,7 +1158,7 @@ class TaskInProgress {
//set this the first time we run a taskAttempt in this TIP
//each Task attempt has its own TaskStatus, which tracks that
- //attempts execStartTime, thus this startTime is TIP wide.
+ //attempt's execStartTime, thus this startTime is TIP wide.
if (0 == execStartTime){
setExecStartTime(lastDispatchTime);
}
@@ -1132,8 +1170,9 @@ class TaskInProgress {
}
/**
- * Adds a previously running task to this tip. This is used in case of
- * jobtracker restarts.
+ * Creates a task or recreates a previously running one and adds it to this
+ * tip. The latter is used in case of jobtracker restarts. This is the
+ * ultimate source of Task objects in a normal Hadoop setup.
*/
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
@@ -1147,10 +1186,20 @@ class TaskInProgress {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());
}
- t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
- numSlotsNeeded);
+ t = new MapTask(jobFile, taskid, partition,
+ splitInfo[0].getSplitIndex(), numSlotsNeeded);
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+ if (isUberTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Launching actual UberTask (" + numMaps + " maps, "
+ + numReduces + " reduces)");
+ }
+ // numMaps is implicit in size of splitIndex array:
+ t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(),
+ numReduces, numSlotsNeeded, jobSetupCleanupNeeded);
+ } else {
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
+ }
}
if (jobCleanup) {
t.setJobCleanupTask();
@@ -1187,6 +1236,17 @@ class TaskInProgress {
return t;
}
+ // GRR FIXME? more efficient just to pass splitInfo directly...any need for
+ // rest of it in UberTask?
+ TaskSplitIndex[] getSplitIndexArray() {
+ int numSplits = splitInfo.length;
+ TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ splitIndex[i] = splitInfo[i].getSplitIndex();
+ }
+ return splitIndex;
+ }
+
boolean isRunningTask(TaskAttemptID taskid) {
TaskStatus status = taskStatuses.get(taskid);
return status != null && status.getRunState() == TaskStatus.State.RUNNING;
@@ -1236,7 +1296,8 @@ class TaskInProgress {
}
/**
- * Get the id of this map or reduce task.
+ * Get the task index of this map or reduce task. For example,
+ * "task_201011230308_87259_r_000240" would return 240.
* @return The index of this tip in the maps/reduces lists.
*/
public int getIdWithinJob() {
@@ -1256,7 +1317,7 @@ class TaskInProgress {
public int getSuccessEventNumber() {
return successEventNumber;
}
-
+
/**
* Gets the Node list of input split locations sorted in rack order.
*/
@@ -1264,7 +1325,7 @@ class TaskInProgress {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
- String[] splits = splitInfo.getLocations();
+ String[] splits = splitInfo[0].getLocations(); // actually replicas
Node[] nodes = new Node[splits.length];
for (int i = 0; i < splits.length; i++) {
nodes[i] = jobtracker.getNode(splits[i]);
@@ -1293,13 +1354,20 @@ class TaskInProgress {
}
public long getMapInputSize() {
- if(isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo.getInputDataLength();
+ if (isUberTask()) {
+ int numSplits = splitInfo.length;
+ long sumInputDataLength = 0;
+ for (int i = 0; i < numSplits; ++i) {
+ sumInputDataLength += splitInfo[i].getInputDataLength();
+ }
+ return sumInputDataLength;
+ } else if (isMapTask() && !jobSetup && !jobCleanup) {
+ return splitInfo[0].getInputDataLength();
} else {
return 0;
}
}
-
+
/**
* Compare most recent task attempts dispatch time to current system time so
* that task progress rate will slow down as time proceeds even if no progress
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 8 05:53:52 2011
@@ -41,10 +41,15 @@ public abstract class TaskStatus impleme
static final Log LOG =
LogFactory.getLog(TaskStatus.class.getName());
- //enumeration for reporting current phase of a task.
+ // what kind of TaskStatus is it?
@InterfaceAudience.Private
@InterfaceStability.Unstable
- public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+ public static enum Type {MAP, REDUCE, UBER}
+
+ // enumeration for reporting current phase of a task.
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
// what state is the task in?
@InterfaceAudience.Private
@@ -105,6 +110,7 @@ public abstract class TaskStatus impleme
public TaskAttemptID getTaskID() { return taskid; }
public abstract boolean getIsMap();
+ public abstract boolean getIsUber();
public int getNumSlots() {
return numSlots;
}
@@ -495,46 +501,61 @@ public abstract class TaskStatus impleme
//////////////////////////////////////////////////////////////////////////////
// Factory-like methods to create/read/write appropriate TaskStatus objects
//////////////////////////////////////////////////////////////////////////////
-
- static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId,
- float progress, int numSlots,
- State runState, String diagnosticInfo,
- String stateString, String taskTracker,
- Phase phase, Counters counters)
- throws IOException {
- boolean isMap = in.readBoolean();
- return createTaskStatus(isMap, taskId, progress, numSlots, runState,
- diagnosticInfo, stateString, taskTracker, phase,
- counters);
- }
-
+
+ // this is the main one used by TT.TIP, JIP, and (apparently) all the
+ // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus,
+ // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities)
+ // [also formerly MapTask and ReduceTask, but no longer]
static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId,
float progress, int numSlots,
State runState, String diagnosticInfo,
String stateString, String taskTracker,
Phase phase, Counters counters) {
return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState,
- diagnosticInfo, stateString, taskTracker,
+ diagnosticInfo, stateString, taskTracker,
phase, counters) :
new ReduceTaskStatus(taskId, progress, numSlots, runState,
diagnosticInfo, stateString,
taskTracker, phase, counters);
}
-
- static TaskStatus createTaskStatus(boolean isMap) {
- return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
+
+ // used only in default ctors of Task (also formerly MapTask, ReduceTask) and
+ // readTaskStatus() below
+ static TaskStatus createTaskStatus(Type tsType) {
+ return (tsType == TaskStatus.Type.MAP)
+ ? new MapTaskStatus()
+ : (tsType == TaskStatus.Type.REDUCE)
+ ? new ReduceTaskStatus()
+ : new UberTaskStatus();
}
static TaskStatus readTaskStatus(DataInput in) throws IOException {
boolean isMap = in.readBoolean();
- TaskStatus taskStatus = createTaskStatus(isMap);
+ boolean isUber = in.readBoolean();
+ Type tsType = isMap
+ ? TaskStatus.Type.MAP
+ : isUber
+ ? TaskStatus.Type.UBER
+ : TaskStatus.Type.REDUCE;
+ TaskStatus taskStatus = createTaskStatus(tsType);
taskStatus.readFields(in);
return taskStatus;
}
static void writeTaskStatus(DataOutput out, TaskStatus taskStatus)
throws IOException {
+/* LATER
+ * //GRR FIXME: longer-term, just store tsType as member var (but then need
+ * // to modify or add new ctor: used in many places)
+ * Type tsType = taskStatus.getIsUber()
+ * ? TaskStatus.Type.UBER
+ * : taskStatus.getIsMap()
+ * ? TaskStatus.Type.MAP
+ * : TaskStatus.Type.REDUCE;
+ * WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...)
+ */
out.writeBoolean(taskStatus.getIsMap());
+ out.writeBoolean(taskStatus.getIsUber());
taskStatus.write(out);
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:53:52 2011
@@ -827,7 +827,7 @@ public class TaskTracker
f = rjob.getFetchStatus();
for (TaskInProgress tip : rjob.tasks) {
Task task = tip.getTask();
- if (!task.isMapTask()) {
+ if (!task.isMapTask() && !task.isUberTask()) {
if (((ReduceTask)task).getPhase() ==
TaskStatus.Phase.SHUFFLE) {
if (rjob.getFetchStatus() == null) {
@@ -2502,21 +2502,27 @@ public class TaskTracker
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
- taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
- 0.0f,
- task.getNumSlotsRequired(),
- task.getState(),
- diagnosticInfo.toString(),
- "initializing",
- getName(),
- task.isTaskCleanupTask() ?
- TaskStatus.Phase.CLEANUP :
- task.isMapTask()? TaskStatus.Phase.MAP:
- TaskStatus.Phase.SHUFFLE,
- task.getCounters());
+ if (task.isUberTask()) {
+ taskStatus = new UberTaskStatus(
+ task.getTaskID(), 0.0f,
+ task.getNumSlotsRequired(), task.getState(),
+ diagnosticInfo.toString(), "initializing", getName(),
+ TaskStatus.Phase.MAP, task.getCounters());
+ } else {
+ taskStatus = TaskStatus.createTaskStatus(
+ task.isMapTask(), task.getTaskID(), 0.0f,
+ task.getNumSlotsRequired(), task.getState(),
+ diagnosticInfo.toString(), "initializing", getName(),
+ task.isTaskCleanupTask()
+ ? TaskStatus.Phase.CLEANUP
+ : task.isMapTask()
+ ? TaskStatus.Phase.MAP
+ : TaskStatus.Phase.SHUFFLE,
+ task.getCounters());
+ }
taskTimeout = (10 * 60 * 1000);
}
-
+
void localizeTask(Task task) throws IOException{
FileSystem localFs = FileSystem.getLocal(fConf);
@@ -2633,7 +2639,7 @@ public class TaskTracker
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
LOG.info("Not launching task: " + task.getTaskID() +
- " since it's state is " + this.taskStatus.getRunState());
+ " since its state is " + this.taskStatus.getRunState());
}
}
@@ -3206,7 +3212,7 @@ public class TaskTracker
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
if (!jvmManager.isJvmKnown(jvmId)) {
- LOG.info("Killing unknown JVM " + jvmId);
+ LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME: bug? no (apparent) killing going on here...
return new JvmTask(null, true);
}
RunningJob rjob = runningJobs.get(jvmId.getJobId());
@@ -3237,7 +3243,7 @@ public class TaskTracker
public synchronized boolean statusUpdate(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException {
- TaskInProgress tip = tasks.get(taskid);
+ TaskInProgress tip = tasks.get(taskid); // TT.TIP, not TaskInProgress.java
if (tip != null) {
tip.reportProgress(taskStatus);
myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
@@ -3685,17 +3691,15 @@ public class TaskTracker
}
userName = rjob.jobConf.getUser();
}
- // Index file
- Path indexFileName =
- lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- userName, jobId, mapId)
- + "/file.out.index", conf);
-
- // Map-output file
- Path mapOutputFileName =
- lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- userName, jobId, mapId)
- + "/file.out", conf);
+
+ // Map-output filename ("... /file.out")
+ StringBuilder sb = new StringBuilder();
+ sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId))
+ .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING);
+ Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
+ // Index filename ("... /file.out.index")
+ sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
/**
* Read the index file to get the information about where the map-output
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar 8 05:53:52 2011
@@ -271,4 +271,19 @@ public interface MRJobConfig {
"mapreduce.job.submithostname";
public static final String JOB_SUBMITHOSTADDR =
"mapreduce.job.submithostaddress";
+
+ public static final String JOB_UBERTASK_ENABLE =
+ "mapreduce.job.ubertask.enable";
+ public static final String JOB_UBERTASK_MAXMAPS =
+ "mapreduce.job.ubertask.maxmaps";
+ public static final String JOB_UBERTASK_MAXREDUCES =
+ "mapreduce.job.ubertask.maxreduces";
+ public static final String JOB_UBERTASK_MAXBYTES =
+ "mapreduce.job.ubertask.maxbytes";
+ public static final String UBERTASK_JAVA_OPTS =
+ "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
+ public static final String UBERTASK_ULIMIT =
+ "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
+ public static final String UBERTASK_ENV =
+ "mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar 8 05:53:52 2011
@@ -64,6 +64,9 @@
{"name": "launchTime", "type": "long"},
{"name": "totalMaps", "type": "int"},
{"name": "totalReduces", "type": "int"},
+ {"name": "isUber", "type": "boolean"},
+ {"name": "numUberSubMaps", "type": "int"},
+ {"name": "numUberSubReduces", "type": "int"},
{"name": "jobStatus", "type": "string"}
]
},
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar 8 05:53:52 2011
@@ -526,6 +526,8 @@ public class HistoryViewer {
int totalReduces = 0;
int totalCleanups = 0;
int totalSetups = 0;
+ int numUberSubMaps = 0;
+ int numUberSubReduces = 0;
int numFailedMaps = 0;
int numKilledMaps = 0;
int numFailedReduces = 0;
@@ -544,15 +546,20 @@ public class HistoryViewer {
long cleanupFinished = 0;
long setupStarted = 0;
long setupFinished = 0;
-
+ boolean isUber = false;
+
/** Get total maps */
public int getTotalMaps() { return totalMaps; }
/** Get total reduces */
public int getTotalReduces() { return totalReduces; }
- /** Get number of clean up tasks */
+ /** Get number of cleanup tasks */
public int getTotalCleanups() { return totalCleanups; }
- /** Get number of set up tasks */
+ /** Get number of setup tasks */
public int getTotalSetups() { return totalSetups; }
+ /** Get number of map subtasks within UberTask */
+ public int getNumUberSubMaps() { return numUberSubMaps; }
+ /** Get number of reduce subtasks within UberTask */
+ public int getNumUberSubReduces() { return numUberSubReduces; }
/** Get number of failed maps */
public int getNumFailedMaps() { return numFailedMaps; }
/** Get number of killed maps */
@@ -567,11 +574,11 @@ public class HistoryViewer {
public int getNumFailedCleanups() { return numFailedCleanups; }
/** Get number of killed cleanup tasks */
public int getNumKilledCleanups() { return numKilledCleanups; }
- /** Get number of finished set up tasks */
+ /** Get number of finished setup tasks */
public int getNumFinishedSetups() { return numFinishedSetups; }
- /** Get number of failed set up tasks */
+ /** Get number of failed setup tasks */
public int getNumFailedSetups() { return numFailedSetups; }
- /** Get number of killed set up tasks */
+ /** Get number of killed setup tasks */
public int getNumKilledSetups() { return numKilledSetups; }
/** Get number of maps that were started */
public long getMapStarted() { return mapStarted; }
@@ -589,8 +596,10 @@ public class HistoryViewer {
public long getSetupStarted() { return setupStarted; }
/** Get number of setup tasks that finished */
public long getSetupFinished() { return setupFinished; }
+ /** Get job's UberTask/non-UberTask status */
+ public boolean isUber() { return isUber; }
- /** Create summary information for the parsed job */
+ /** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
tasks = job.getAllTasks();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Mar 8 05:53:52 2011
@@ -302,6 +302,9 @@ public class JobHistoryParser {
info.launchTime = event.getLaunchTime();
info.totalMaps = event.getTotalMaps();
info.totalReduces = event.getTotalReduces();
+ info.isUber = event.getIsUber();
+ info.numUberSubMaps = event.getNumUberSubMaps();
+ info.numUberSubReduces = event.getNumUberSubReduces();
}
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
@@ -334,6 +337,9 @@ public class JobHistoryParser {
long launchTime;
int totalMaps;
int totalReduces;
+ boolean isUber;
+ int numUberSubMaps;
+ int numUberSubReduces;
int failedMaps;
int failedReduces;
int finishedMaps;
@@ -352,8 +358,9 @@ public class JobHistoryParser {
*/
public JobInfo() {
submitTime = launchTime = finishTime = -1;
- totalMaps = totalReduces = failedMaps = failedReduces = 0;
- finishedMaps = finishedReduces = 0;
+ isUber = false;
+ totalMaps = totalReduces = numUberSubMaps = numUberSubReduces = 0;
+ failedMaps = failedReduces = finishedMaps = finishedReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
@@ -370,6 +377,7 @@ public class JobHistoryParser {
System.out.println("PRIORITY: " + priority);
System.out.println("TOTAL_MAPS: " + totalMaps);
System.out.println("TOTAL_REDUCES: " + totalReduces);
+ //GRR FIXME: add UBER_SUBMAPS and UBER_SUBREDUCES? (or only if isUber == true? coordinate with TaskInfo printAll() changes)
System.out.println("MAP_COUNTERS:" + mapCounters.toString());
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
@@ -395,18 +403,24 @@ public class JobHistoryParser {
public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */
public long getLaunchTime() { return launchTime; }
- /** Get the total number of maps */
- public long getTotalMaps() { return totalMaps; }
- /** Get the total number of reduces */
- public long getTotalReduces() { return totalReduces; }
+ /** Get the total number of "real" maps */
+ public int getTotalMaps() { return totalMaps; }
+ /** Get the total number of "real" reduces */
+ public int getTotalReduces() { return totalReduces; }
+ /** Was the job small enough to be converted to an UberTask? */
+ public boolean getIsUber() { return isUber; }
+ /** Get the number of sub-MapTasks within the UberTask */
+ public int getNumUberSubMaps() { return numUberSubMaps; }
+ /** Get the number of sub-ReduceTasks within the UberTask */
+ public int getNumUberSubReduces() { return numUberSubReduces; }
/** Get the total number of failed maps */
- public long getFailedMaps() { return failedMaps; }
+ public int getFailedMaps() { return failedMaps; }
/** Get the number of failed reduces */
- public long getFailedReduces() { return failedReduces; }
+ public int getFailedReduces() { return failedReduces; }
/** Get the number of finished maps */
- public long getFinishedMaps() { return finishedMaps; }
+ public int getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reduces */
- public long getFinishedReduces() { return finishedReduces; }
+ public int getFinishedReduces() { return finishedReduces; }
/** Get the job status */
public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; }
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Mar 8 05:53:52 2011
@@ -44,11 +44,15 @@ public class JobInitedEvent implements H
* @param jobStatus
*/
public JobInitedEvent(JobID id, long launchTime, int totalMaps,
- int totalReduces, String jobStatus) {
+ int totalReduces, boolean isUber, int numUberSubMaps,
+ int numUberSubReduces, String jobStatus) {
datum.jobid = new Utf8(id.toString());
datum.launchTime = launchTime;
datum.totalMaps = totalMaps;
datum.totalReduces = totalReduces;
+ datum.isUber = isUber;
+ datum.numUberSubMaps = numUberSubMaps;
+ datum.numUberSubReduces = numUberSubReduces;
datum.jobStatus = new Utf8(jobStatus);
}
@@ -61,10 +65,16 @@ public class JobInitedEvent implements H
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
/** Get the launch time */
public long getLaunchTime() { return datum.launchTime; }
- /** Get the total number of maps */
+ /** Get the total number of "real" maps */
public int getTotalMaps() { return datum.totalMaps; }
- /** Get the total number of reduces */
+ /** Get the total number of "real" reduces */
public int getTotalReduces() { return datum.totalReduces; }
+ /** Was the job small enough to be converted to an UberTask? */
+ public boolean getIsUber() { return datum.isUber; }
+ /** Get the number of sub-MapTasks within the UberTask */
+ public int getNumUberSubMaps() { return datum.numUberSubMaps; }
+ /** Get the number of sub-ReduceTasks within the UberTask */
+ public int getNumUberSubReduces() { return datum.numUberSubReduces; }
/** Get the status */
public String getStatus() { return datum.jobStatus.toString(); }
/** Get the event type */
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Mar 8 05:53:52 2011
@@ -241,8 +241,12 @@ public class TestJobQueueTaskScheduler e
}
@Override
+ public boolean getIsUber() {
+ return t.isUberTask();
+ }
+
+ @Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
-
}
};
status.setRunState(TaskStatus.State.RUNNING);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar 8 05:53:52 2011
@@ -103,7 +103,7 @@ public class TestTaskStatus {
// check the default case
String test = "hi";
final int maxSize = 16;
- TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
+ TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
null) {
@Override
protected int getMaxStringSize() {
@@ -118,6 +118,11 @@ public class TestTaskStatus {
public boolean getIsMap() {
return false;
}
+
+ @Override
+ public boolean getIsUber() {
+ return false;
+ }
};
assertEquals("Small diagnostic info test failed",
status.getDiagnosticInfo(), test);
@@ -198,6 +203,11 @@ public class TestTaskStatus {
public boolean getIsMap() {
return false;
}
+
+ @Override
+ public boolean getIsUber() {
+ return false;
+ }
};
assertEquals("Large diagnostic info test failed",
maxSize, status.getDiagnosticInfo().length());
Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Tue Mar 8 05:53:52 2011
@@ -125,10 +125,15 @@ public class Job20LineHistoryEventEmitte
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
+ // note: UberTask playback not supported since uber data not yet logged
+ boolean isUber = false;
+ int numUberSubMaps = 0;
+ int numUberSubReduces = 0;
if (launchTime != null && totalMaps != null && totalReduces != null) {
- return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
- .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+ return new JobInitedEvent(jobID, Long.parseLong(launchTime),
+ Integer.parseInt(totalMaps), Integer.parseInt(totalReduces),
+ isUber, numUberSubMaps, numUberSubReduces, status);
}
return null;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp Tue Mar 8 05:53:52 2011
@@ -365,10 +365,16 @@
"<th>Killed</th>" +
"<th><a href=\"jobfailures.jsp?jobid=" + jobId +
"\">Failed/Killed<br>Task Attempts</a></th></tr>\n");
- printTaskSummary(out, jobId, "map", status.mapProgress(),
- job.getTasks(TaskType.MAP));
- printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
- job.getTasks(TaskType.REDUCE));
+ if (job.getUberMode()) {
+ /* placeholder until true task- and attempt-level uber info available */
+ printTaskSummary(out, jobId, "uber", status.reduceProgress(),
+ job.getTasks(TaskType.REDUCE));
+ } else {
+ printTaskSummary(out, jobId, "map", status.mapProgress(),
+ job.getTasks(TaskType.MAP));
+ printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
+ job.getTasks(TaskType.REDUCE));
+ }
out.print("</table>\n");
%>
@@ -421,6 +427,7 @@
%>
</table>
+<%if (job.getTasks(TaskType.MAP).length > 0) { %>
<hr>Map Completion Graph -
<%
if("off".equals(request.getParameter("map.graph"))) {
@@ -442,10 +449,14 @@ if("off".equals(session.getAttribute("ma
width="<%=TaskGraphServlet.width + 2 * TaskGraphServlet.xmargin%>"
height="<%=TaskGraphServlet.height + 3 * TaskGraphServlet.ymargin%>"
style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/" />
-<%}%>
+<%} }%>
-<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
+<%if (job.getTasks(TaskType.REDUCE).length > 0) { %>
+<%if (job.getUberMode()) { %>
+<hr>UberTask Completion Graph -
+<%} else { %>
<hr>Reduce Completion Graph -
+<%}%>
<%if("off".equals(session.getAttribute("reduce.graph"))) { %>
<a href="/jobdetails.jsp?jobid=<%=jobId%>&refresh=<%=refresh%>&reduce.graph=on" > open </a>
<%} else { %>
Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp?rev=1079191&r1=1079190&r2=1079191&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp Tue Mar 8 05:53:52 2011
@@ -56,6 +56,12 @@
if (job == null) {
return;
}
+ // MR-1220 FIXME (LATER): to fully integrate uberization, need task-
+ // and attempt-level JobHistory/Avro/etc. changes to multiple Event
+ // types; instead, going with top-level hack for now
+ boolean isUber = job.getIsUber();
+ int numUberSubMaps = job.getNumUberSubMaps();
+ int numUberSubReduces = job.getNumUberSubReduces();
if (job.getJobStatus().equals("FAILED"))
reasonforFailure = job.getErrorInfo();
%>
@@ -90,7 +96,7 @@
<center>
<table border="2" cellpadding="5" cellspacing="2">
<tr>
-<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
+<td>Kind</td><td>Total Tasks (successful+<wbr>failed+<wbr>killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
</tr>
<tr>
<td>Setup</td>
@@ -106,6 +112,10 @@
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getSetupFinished(), sj.getSetupStarted()) %></td>
</tr>
<tr>
+
+<%
+ if (!isUber) {
+%>
<td>Map</td>
<td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=MAP&status=all">
<%=sj.getTotalMaps()%></a></td>
@@ -131,6 +141,36 @@
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
</tr>
+
+<%
+ } else /* isUber */ {
+%>
+
+<tr>
+<td>Uber</td>
+ <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=all">
+ <%=sj.getTotalReduces()%></a></td>
+ <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=SUCCEEDED">
+ <%=job.getFinishedReduces()%></a></td>
+ <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=FAILED">
+ <%=sj.getNumFailedReduces()%></a></td>
+ <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=KILLED">
+ <%=sj.getNumKilledReduces()%></a></td>
+ <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
+ <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
+</tr>
+<td>Map subtasks</td>
+ <td colspan="6"><%=numUberSubMaps%></td>
+</tr>
+<tr>
+<td>Reduce subtasks</td>
+ <td colspan="6"><%=numUberSubReduces%></td>
+</tr>
+
+<%
+ }
+%>
+
<tr>
<td>Cleanup</td>
<td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=JOB_CLEANUP&status=all">
|