Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar 8 05:54:02 2011
@@ -72,8 +72,10 @@ public class ReduceTask extends Task {
private CompressionCodec codec;
+
{
getProgress().setStatus("reduce");
+ setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
private Progress copyPhase;
@@ -119,21 +121,14 @@ public class ReduceTask extends Task {
public ReduceTask() {
super();
- this.taskStatus = new ReduceTaskStatus();
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
int partition, int numMaps, int numSlotsRequired) {
super(jobFile, taskId, partition, numSlotsRequired);
this.numMaps = numMaps;
-/*
- */
- this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired,
- TaskStatus.State.UNASSIGNED,
- "", "", "", TaskStatus.Phase.SHUFFLE,
- getCounters());
}
-
+
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
@@ -156,45 +151,6 @@ public class ReduceTask extends Task {
return false;
}
- /**
- * Is this really a combo-task masquerading as a plain MapTask? Decidedly
- * not.
- */
- @Override
- public boolean isUberTask() {
- return false;
- }
-
- /**
- * Allow UberTask (or, potentially, JobInProgress or others) to set up a
- * deeper Progress hierarchy even if run() is skipped. If setProgress()
- * is also needed, it should be called <I>before</I> createPhase() or else
- * the sub-phases created here will be wiped out.
- */
- void createPhase(TaskStatus.Phase phaseType, String status) {
- if (phaseType == TaskStatus.Phase.SHUFFLE) {
- copyPhase = getProgress().addPhase(status);
- } else if (phaseType == TaskStatus.Phase.SORT) {
- sortPhase = getProgress().addPhase(status);
- } else /* TaskStatus.Phase.REDUCE */ {
- reducePhase = getProgress().addPhase(status);
- }
- }
-
- /**
- * Allow UberTask to traverse the deeper Progress hierarchy in case run() is
- * skipped.
- */
- void completePhase(TaskStatus.Phase phaseType) {
- if (phaseType == TaskStatus.Phase.SHUFFLE) {
- copyPhase.complete();
- } else if (phaseType == TaskStatus.Phase.SORT) {
- sortPhase.complete();
- } else /* TaskStatus.Phase.REDUCE */ {
- reducePhase.complete();
- }
- }
-
public int getNumMaps() { return numMaps; }
/**
@@ -221,39 +177,37 @@ public class ReduceTask extends Task {
}
// Get the input files for the reducer.
- static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal)
+ private Path[] getMapFiles(FileSystem fs, boolean isLocal)
throws IOException {
List<Path> fileList = new ArrayList<Path>();
if (isLocal) {
// for local jobs
- for (int i = 0; i < reduce.numMaps; ++i) {
- fileList.add(reduce.mapOutputFile.getInputFile(i));
+ for(int i = 0; i < numMaps; ++i) {
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
- for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) {
+ for (FileStatus filestatus : mapOutputFilesOnDisk) {
fileList.add(filestatus.getPath());
}
}
return fileList.toArray(new Path[0]);
}
- private static class ReduceValuesIterator<KEY,VALUE>
+ private class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
- ReduceTask reduce;
- public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in,
+ public ReduceValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
- this.reduce = reduce;
}
@Override
public VALUE next() {
- reduce.reduceInputValueCounter.increment(1);
+ reduceInputValueCounter.increment(1);
return moveToNext();
}
@@ -262,13 +216,12 @@ public class ReduceTask extends Task {
}
public void informReduceProgress() {
- // update progress:
- reduce.reducePhase.set(super.in.getProgress().getProgress());
+ reducePhase.set(super.in.getProgress().getProgress()); // update progress
reporter.progress();
}
}
- private static class SkippingReduceValuesIterator<KEY,VALUE>
+ private class SkippingReduceValuesIterator<KEY,VALUE>
extends ReduceValuesIterator<KEY,VALUE> {
private SkipRangeIterator skipIt;
private TaskUmbilicalProtocol umbilical;
@@ -281,27 +234,26 @@ public class ReduceTask extends Task {
private boolean toWriteSkipRecs;
private boolean hasNext;
private TaskReporter reporter;
-
- public SkippingReduceValuesIterator(ReduceTask reduce,
- RawKeyValueIterator in,
+
+ public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
- super(reduce, in, comparator, keyClass, valClass, conf, reporter);
+ super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
- this.skipGroupCounter =
+ this.skipGroupCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
- this.skipRecCounter =
+ this.skipRecCounter =
reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
- this.toWriteSkipRecs = reduce.toWriteSkipRecs() &&
+ this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
- skipIt = reduce.getSkipRanges().skipRangeIterator();
+ skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
-
+
public void nextKey() throws IOException {
super.nextKey();
mayBeSkip();
@@ -340,16 +292,16 @@ public class ReduceTask extends Task {
}
skipGroupCounter.increment(skip);
skipRecCounter.increment(skipRec);
- reduce.reportNextRecordRange(umbilical, grpIndex);
+ reportNextRecordRange(umbilical, grpIndex);
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(KEY key, VALUE value) throws IOException{
if(skipWriter==null) {
- Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf);
- Path skipFile = new Path(skipDir, reduce.getTaskID().toString());
+ Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+ Path skipFile = new Path(skipDir, getTaskID().toString());
skipWriter = SequenceFile.createWriter(
- skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile,
+ skipFile.getFileSystem(conf), conf, skipFile,
keyClass, valClass,
CompressionType.BLOCK, reporter);
}
@@ -411,8 +363,8 @@ public class ReduceTask extends Task {
} else {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), codec,
- getMapFiles(this, rfs, true),
+ job.getMapOutputValueClass(), codec,
+ getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(),
job.getInt(JobContext.IO_SORT_FACTOR, 100),
new Path(getTaskID().toString()),
@@ -430,40 +382,18 @@ public class ReduceTask extends Task {
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
- runNewReducer(this, job, umbilical, reporter, rIter, comparator,
+ runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
- runOldReducer(this, job, umbilical, reporter, rIter, comparator,
+ runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
done(umbilical, reporter);
}
- private static class WrappedOutputCollector<OUTKEY, OUTVALUE>
- implements OutputCollector<OUTKEY, OUTVALUE> {
- RecordWriter<OUTKEY, OUTVALUE> out;
- TaskReporter reporter;
- Counters.Counter reduceOutputCounter;
- public WrappedOutputCollector(ReduceTask reduce,
- RecordWriter<OUTKEY, OUTVALUE> out,
- TaskReporter reporter) {
- this.out = out;
- this.reporter = reporter;
- this.reduceOutputCounter = reduce.reduceOutputCounter;
- }
-
- public void collect(OUTKEY key, OUTVALUE value)
- throws IOException {
- out.write(key, value);
- reduceOutputCounter.increment(1);
- // indicate that progress update needs to be sent
- reporter.progress();
- }
- }
-
@SuppressWarnings("unchecked")
- static <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runOldReducer(ReduceTask reduce, JobConf job,
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runOldReducer(JobConf job,
TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -473,7 +403,7 @@ public class ReduceTask extends Task {
Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
// make output collector
- String finalName = getOutputName(reduce.getPartition());
+ String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
@@ -481,7 +411,15 @@ public class ReduceTask extends Task {
job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
OutputCollector<OUTKEY,OUTVALUE> collector =
- new WrappedOutputCollector<OUTKEY, OUTVALUE>(reduce, out, reporter);
+ new OutputCollector<OUTKEY,OUTVALUE>() {
+ public void collect(OUTKEY key, OUTVALUE value)
+ throws IOException {
+ out.write(key, value);
+ reduceOutputCounter.increment(1);
+ // indicate that progress update needs to be sent
+ reporter.progress();
+ }
+ };
// apply reduce function
try {
@@ -489,16 +427,16 @@ public class ReduceTask extends Task {
boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
SkipBadRecords.getAutoIncrReducerProcCount(job);
- ReduceValuesIterator<INKEY,INVALUE> values = reduce.isSkipping() ?
- new SkippingReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
+ ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ?
+ new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
- new ReduceValuesIterator<INKEY,INVALUE>(reduce, rIter,
+ new ReduceValuesIterator<INKEY,INVALUE>(rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
- reduce.reduceInputKeyCounter.increment(1);
+ reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
@@ -511,7 +449,7 @@ public class ReduceTask extends Task {
//Clean up: repeated in catch block below
reducer.close();
out.close(reporter);
- //End of cleanup.
+ //End of clean up.
} catch (IOException ioe) {
try {
reducer.close();
@@ -549,38 +487,9 @@ public class ReduceTask extends Task {
}
}
- private static class WrappedRawKeyValueIterator implements RawKeyValueIterator {
- ReduceTask reduce;
- TaskReporter reporter;
- RawKeyValueIterator rawIter;
- public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter,
- RawKeyValueIterator rawIter) {
- this.reduce = reduce;
- this.rawIter = rawIter;
- this.reporter = reporter;
- }
- public void close() throws IOException {
- rawIter.close();
- }
- public DataInputBuffer getKey() throws IOException {
- return rawIter.getKey();
- }
- public Progress getProgress() {
- return rawIter.getProgress();
- }
- public DataInputBuffer getValue() throws IOException {
- return rawIter.getValue();
- }
- public boolean next() throws IOException {
- boolean ret = rawIter.next();
- reporter.setProgress(rawIter.getProgress().getProgress());
- return ret;
- }
- }
-
@SuppressWarnings("unchecked")
- static <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runNewReducer(final ReduceTask reduce, JobConf job,
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
@@ -589,29 +498,49 @@ public class ReduceTask extends Task {
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
- org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID();
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;
- rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter);
+ rIter = new RawKeyValueIterator() {
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ };
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId);
+ new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
(org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
- reduce.outputFormat.getRecordWriter(taskContext);
+ outputFormat.getRecordWriter(taskContext);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
- new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduce.reduceOutputCounter);
- job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping());
- org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
- createReduceContext(reducer, job, reduceId, rIter,
- reduce.reduceInputKeyCounter,
- reduce.reduceInputValueCounter,
- trackedRW, reduce.committer, reporter,
- comparator, keyClass, valueClass);
+ new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
+ job.setBoolean("mapred.skip.on", isSkipping());
+ job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
+ org.apache.hadoop.mapreduce.Reducer.Context
+ reducerContext = createReduceContext(reducer, job, getTaskID(),
+ rIter, reduceInputKeyCounter,
+ reduceInputValueCounter,
+ trackedRW,
+ committer,
+ reporter, comparator, keyClass,
+ valueClass);
reducer.run(reducerContext);
output.close(reducerContext);
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar 8 05:54:02 2011
@@ -54,11 +54,6 @@ class ReduceTaskStatus extends TaskStatu
}
@Override
- public boolean getIsUber() {
- return false;
- }
-
- @Override
void setFinishTime(long finishTime) {
if (shuffleFinishTime == 0) {
this.shuffleFinishTime = finishTime;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:54:02 2011
@@ -118,7 +118,6 @@ abstract public class Task implements Wr
private String jobFile; // job configuration file
private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id
- private TaskAttemptID taskIdForUmbilical; // same, or uber's if subtask
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
protected JobStatus.State jobRunStateForCleanup;
@@ -160,8 +159,8 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public Task() {
+ taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
- taskIdForUmbilical = taskId;
spilledRecordsCounter =
counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter =
@@ -175,10 +174,17 @@ abstract public class Task implements Wr
int numSlotsRequired) {
this.jobFile = jobFile;
this.taskId = taskId;
- this.taskIdForUmbilical = taskId;
this.partition = partition;
this.numSlotsRequired = numSlotsRequired;
+ this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId,
+ 0.0f, numSlotsRequired,
+ TaskStatus.State.UNASSIGNED,
+ "", "", "",
+ isMapTask() ?
+ TaskStatus.Phase.MAP :
+ TaskStatus.Phase.SHUFFLE,
+ counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
@@ -191,12 +197,7 @@ abstract public class Task implements Wr
////////////////////////////////////////////
public void setJobFile(String jobFile) { this.jobFile = jobFile; }
public String getJobFile() { return jobFile; }
-
public TaskAttemptID getTaskID() { return taskId; }
- public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) {
- this.taskIdForUmbilical = taskIdForUmbilical;
- }
-
public int getNumSlotsRequired() {
return numSlotsRequired;
}
@@ -268,7 +269,7 @@ abstract public class Task implements Wr
/**
* Report a fatal error to the parent (task) tracker.
*/
- protected void reportFatalError(Throwable throwable,
+ protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
Throwable tCause = throwable.getCause();
@@ -276,7 +277,7 @@ abstract public class Task implements Wr
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
- umbilical.fatalError(taskIdForUmbilical, cause);
+ umbilical.fatalError(id, cause);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
@@ -388,14 +389,6 @@ abstract public class Task implements Wr
this.user = user;
}
- /**
- * Return the task's MapOutputFile instance.
- * @return the task's MapOutputFile instance
- */
- MapOutputFile getMapOutputFile() {
- return mapOutputFile;
- }
-
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -403,7 +396,6 @@ abstract public class Task implements Wr
public void write(DataOutput out) throws IOException {
Text.writeString(out, jobFile);
taskId.write(out);
- taskIdForUmbilical.write(out);
out.writeInt(partition);
out.writeInt(numSlotsRequired);
taskStatus.write(out);
@@ -422,7 +414,6 @@ abstract public class Task implements Wr
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
- taskIdForUmbilical = TaskAttemptID.read(in);
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
@@ -474,7 +465,7 @@ abstract public class Task implements Wr
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
- private transient Progress taskProgress = new Progress(); //GRR Q: why transient? lose entire tree every time serialize??
+ private transient Progress taskProgress = new Progress();
// Current counters
private transient Counters counters = new Counters();
@@ -484,21 +475,6 @@ abstract public class Task implements Wr
public abstract boolean isMapTask();
- /**
- * Is this really a combo-task masquerading as a plain MapTask?
- */
- public abstract boolean isUberTask();
-
- /**
- * This setter allows one to incorporate Tasks into multiple levels of
- * a Progress addPhase()-generated hierarchy (i.e., not always the root
- * node), which in turn allows Progress to handle all details of progress
- * aggregation for an UberTask or even a whole job.
- */
- protected void setProgress(Progress progress) {
- taskProgress = progress;
- }
-
public Progress getProgress() { return taskProgress; }
public void initialize(JobConf job, JobID id,
@@ -542,7 +518,7 @@ abstract public class Task implements Wr
resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
}
}
-
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
protected class TaskReporter
@@ -585,7 +561,6 @@ abstract public class Task implements Wr
// indicate that progress update needs to be sent
setProgressFlag();
}
- // FIXME? why isn't this deprecated in favor of public setProgressFlag()?
public void progress() {
// indicate that progress update needs to be sent
setProgressFlag();
@@ -628,16 +603,15 @@ abstract public class Task implements Wr
}
public InputSplit getInputSplit() throws UnsupportedOperationException {
if (split == null) {
- throw new UnsupportedOperationException("Input available only on map");
+ throw new UnsupportedOperationException("Input only available on map");
} else {
return split;
}
}
/**
- * The communication thread handles communication with the parent
- * (TaskTracker). It sends progress updates if progress has been made or
- * if the task needs to let the parent know that it's alive. It also pings
- * the parent to see if it's alive.
+ * The communication thread handles communication with the parent (Task Tracker).
+ * It sends progress updates if progress has been made or if the task needs to
+ * let the parent know that it's alive. It also pings the parent to see if it's alive.
*/
public void run() {
final int MAX_RETRIES = 3;
@@ -673,8 +647,8 @@ abstract public class Task implements Wr
taskFound = umbilical.ping(taskId);
}
- // if TaskTracker is not aware of our task ID (probably because it
- // died and came back up), kill ourselves
+ // if Task Tracker is not aware of our task ID (probably because it died and
+ // came back up), kill ourselves
if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
@@ -708,7 +682,7 @@ abstract public class Task implements Wr
}
}
}
-
+
/**
* Reports the next executing record range to TaskTracker.
*
@@ -727,7 +701,7 @@ abstract public class Task implements Wr
if (LOG.isDebugEnabled()) {
LOG.debug("sending reportNextRecordRange " + range);
}
- umbilical.reportNextRecordRange(taskIdForUmbilical, range);
+ umbilical.reportNextRecordRange(taskId, range);
}
/**
@@ -863,13 +837,8 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- if (isUberTask()) {
- LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId
- + "is done and is in the process of committing.");
- } else {
- LOG.info("Task:" + taskId
- + "is done and is in the process of committing.");
- }
+ LOG.info("Task:" + taskId + " is done."
+ + " And is in the process of commiting");
updateCounters();
boolean commitRequired = isCommitRequired();
@@ -879,7 +848,7 @@ abstract public class Task implements Wr
// say the task tracker that task is commit pending
while (true) {
try {
- umbilical.commitPending(taskIdForUmbilical, taskStatus);
+ umbilical.commitPending(taskId, taskStatus);
break;
} catch (InterruptedException ie) {
// ignore
@@ -930,14 +899,8 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- //GRR FIXME (later): alternatives to taskIdForUmbilical would be
- // (1) include taskId as part of umbilical object and protocol;
- // (2) include taskId as part of taskStatus
- // (3) extend TaskAttemptID (or create related Task inner class?) to
- // include taskAttemptId() and taskAtteptIdForUmbilical() method
- // that's overridden in uber context [Dick]
- if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) {
- LOG.warn("Parent died. Exiting " + taskId);
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
taskStatus.clearStatus();
@@ -973,7 +936,7 @@ abstract public class Task implements Wr
* @return -1 if it can't be found.
*/
private long calculateOutputSize() throws IOException {
- if (!isMapOrReduce() || isUberTask()) {
+ if (!isMapOrReduce()) {
return -1;
}
@@ -993,13 +956,8 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.done(taskIdForUmbilical);
- if (isUberTask()) {
- LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId
- + "' done.");
- } else {
- LOG.info("Task '" + taskId + "' done.");
- }
+ umbilical.done(getTaskID());
+ LOG.info("Task '" + taskId + "' done.");
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
@@ -1018,7 +976,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- while (!umbilical.canCommit(taskIdForUmbilical)) {
+ while (!umbilical.canCommit(taskId)) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
@@ -1075,7 +1033,7 @@ abstract public class Task implements Wr
setPhase(TaskStatus.Phase.CLEANUP);
getProgress().setStatus("cleanup");
statusUpdate(umbilical);
- LOG.info("Running cleanup for the task");
+ LOG.info("Runnning cleanup for the task");
// do the cleanup
committer.abortTask(taskContext);
}
@@ -1100,10 +1058,8 @@ abstract public class Task implements Wr
oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
}
} else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
- // delete <outputdir>/_temporary and optionally create _SUCCESS file
- if (!isUberTask()) { // defer since output files have not yet been saved
- commitJob();
- }
+ LOG.info("Committing job");
+ committer.commitJob(jobContext);
} else {
throw new IOException("Invalid state of the job for cleanup. State found "
+ jobRunStateForCleanup + " expecting "
@@ -1112,31 +1068,20 @@ abstract public class Task implements Wr
+ JobStatus.State.KILLED);
}
- // delete the staging area for the job (e.g.,
- // "hdfs://localhost:9000/tmp/hadoop-<user>/mapred/staging/<user>/.staging/
- // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir)
+ // delete the staging area for the job
JobConf conf = new JobConf(jobContext.getConfiguration());
if (!supportIsolationRunner(conf)) {
- String jobStagingDir = conf.get("mapreduce.job.dir");
- Path jobStagingDirPath = new Path(jobStagingDir);
- FileSystem fs = jobStagingDirPath.getFileSystem(conf);
- fs.delete(jobStagingDirPath, true);
+ String jobTempDir = conf.get("mapreduce.job.dir");
+ Path jobTempDirPath = new Path(jobTempDir);
+ FileSystem fs = jobTempDirPath.getFileSystem(conf);
+ fs.delete(jobTempDirPath, true);
}
- // update counters, save any pending output files, shut down the progress-
- // reporter communication thread and the umbilical, and mark the task done
- if (!isUberTask()) { // defer so UberTask can send TT final update(s)
- done(umbilical, reporter);
- }
- }
-
- protected void commitJob() throws IOException {
- LOG.info("Committing job");
- committer.commitJob(jobContext);
+ done(umbilical, reporter);
}
-
+
protected boolean supportIsolationRunner(JobConf conf) {
- return (conf.getKeepTaskFilesPattern() != null ||
- conf.getKeepFailedTaskFiles());
+ return (conf.getKeepTaskFilesPattern() != null || conf
+ .getKeepFailedTaskFiles());
}
protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
@@ -1145,12 +1090,9 @@ abstract public class Task implements Wr
// do the setup
getProgress().setStatus("setup");
committer.setupJob(jobContext);
- if (!isUberTask()) {
- // UberTask calls done() directly; don't shut down umbilical prematurely
- done(umbilical, reporter);
- }
+ done(umbilical, reporter);
}
-
+
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar 8 05:54:02 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.Node;
@@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node;
* **************************************************************
*/
class TaskInProgress {
- static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
+ static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently
int maxTaskAttempts = 4;
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
@@ -75,19 +75,14 @@ class TaskInProgress {
// Defines the TIP
private String jobFile = null;
- private TaskSplitMetaInfo[] splitInfo;
+ private TaskSplitMetaInfo splitInfo;
private int numMaps;
- private int numReduces;
private int partition;
private JobTracker jobtracker;
private JobHistory jobHistory;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
- private boolean jobCleanup = false;
- private boolean jobSetup = false;
- private boolean isUber = false;
- private boolean jobSetupCleanupNeeded = false; // UberTasks only
// Status of the TIP
private int successEventNumber = -1;
@@ -105,6 +100,8 @@ class TaskInProgress {
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
+ private boolean jobCleanup = false;
+ private boolean jobSetup = false;
private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
@@ -172,8 +169,7 @@ class TaskInProgress {
JobInProgress job, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
- this.splitInfo = new TaskSplitMetaInfo[1];
- this.splitInfo[0] = split;
+ this.splitInfo = split;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
@@ -187,7 +183,7 @@ class TaskInProgress {
}
this.user = job.getUser();
}
-
+
/**
* Constructor for ReduceTask
*/
@@ -269,36 +265,7 @@ class TaskInProgress {
}
}
}
-
- /**
- * Constructor for UberTask
- */
- public TaskInProgress(JobID jobid, String jobFile,
- TaskSplitMetaInfo[] splits,
- int numMaps, int numReduces,
- int partition, JobTracker jobtracker, JobConf conf,
- JobInProgress job, int numSlotsRequired,
- boolean jobSetupCleanupNeeded) {
- this.isUber = true;
- this.jobFile = jobFile;
- this.splitInfo = splits;
- this.numMaps = numMaps;
- this.numReduces = numReduces;
- this.jobSetupCleanupNeeded = jobSetupCleanupNeeded;
- this.jobtracker = jobtracker;
- this.job = job;
- this.conf = conf;
- this.partition = partition;
- this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
- this.numSlotsRequired = numSlotsRequired;
- setMaxTaskAttempts();
- init(jobid);
- if (jobtracker != null) {
- this.jobHistory = jobtracker.getJobHistory();
- }
- this.user = job.getUser();
- }
-
+
/**
* Set the max number of attempts before we declare a TIP as "failed"
*/
@@ -309,6 +276,15 @@ class TaskInProgress {
this.maxTaskAttempts = conf.getMaxReduceAttempts();
}
}
+
+ /**
+ * Return the index of the tip within the job, so
+ * "task_200707121733_1313_0002_m_012345" would return 12345;
+ * @return int the tip index
+ */
+ public int idWithinJob() {
+ return partition;
+ }
public boolean isJobCleanupTask() {
return jobCleanup;
@@ -416,32 +392,19 @@ class TaskInProgress {
public JobInProgress getJob() {
return job;
}
-
/**
* Return an ID for this task, not its component taskid-threads
*/
public TaskID getTIPId() {
return this.id;
}
-
/**
- * Whether this is a map task. Note that ubertasks return false here so
- * they can run in a reduce slot (larger). (Setup and cleanup tasks may
- * return either true or false.)
+ * Whether this is a map task
*/
public boolean isMapTask() {
- return splitInfo != null && !isUber;
+ return splitInfo != null;
}
-
- /**
- * Whether this is an ubertask, i.e., a meta-task that contains a handful
- * of map tasks and (at most) a single reduce task. Note that ubertasks
- * are seen as reduce tasks in most contexts.
- */
- public boolean isUberTask() {
- return isUber;
- }
-
+
/**
* Returns the {@link TaskType} of the {@link TaskAttemptID} passed.
* The type of an attempt is determined by the nature of the task and not its
@@ -966,16 +929,15 @@ class TaskInProgress {
}
/**
- * Get the split locations
+ * Get the split locations
*/
public String[] getSplitLocations() {
-//GRR FIXME? may need to add "( .. || isUberTask())" if ever called for uber (but locations for which split? all of them?)
if (isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo[0].getLocations();
+ return splitInfo.getLocations();
}
return new String[0];
}
-
+
/**
* Get the Status of the tasks managed by this TIP
*/
@@ -1158,7 +1120,7 @@ class TaskInProgress {
//set this the first time we run a taskAttempt in this TIP
//each Task attempt has its own TaskStatus, which tracks that
- //attempt's execStartTime, thus this startTime is TIP wide.
+ //attempts execStartTime, thus this startTime is TIP wide.
if (0 == execStartTime){
setExecStartTime(lastDispatchTime);
}
@@ -1170,9 +1132,8 @@ class TaskInProgress {
}
/**
- * Creates a task or recreates a previously running one and adds it to this
- * tip. The latter is used in case of jobtracker restarts. This is the
- * ultimate source of Task objects in a normal Hadoop setup.
+ * Adds a previously running task to this tip. This is used in case of
+ * jobtracker restarts.
*/
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
@@ -1186,20 +1147,10 @@ class TaskInProgress {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());
}
- t = new MapTask(jobFile, taskid, partition,
- splitInfo[0].getSplitIndex(), numSlotsNeeded);
+ t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
+ numSlotsNeeded);
} else {
- if (isUberTask()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Launching actual UberTask (" + numMaps + " maps, "
- + numReduces + " reduces)");
- }
- // numMaps is implicit in size of splitIndex array:
- t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(),
- numReduces, numSlotsNeeded, jobSetupCleanupNeeded);
- } else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
- }
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
}
if (jobCleanup) {
t.setJobCleanupTask();
@@ -1236,17 +1187,6 @@ class TaskInProgress {
return t;
}
- // GRR FIXME? more efficient just to pass splitInfo directly...any need for
- // rest of it in UberTask?
- TaskSplitIndex[] getSplitIndexArray() {
- int numSplits = splitInfo.length;
- TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits];
- for (int i = 0; i < numSplits; ++i) {
- splitIndex[i] = splitInfo[i].getSplitIndex();
- }
- return splitIndex;
- }
-
boolean isRunningTask(TaskAttemptID taskid) {
TaskStatus status = taskStatuses.get(taskid);
return status != null && status.getRunState() == TaskStatus.State.RUNNING;
@@ -1296,8 +1236,7 @@ class TaskInProgress {
}
/**
- * Get the task index of this map or reduce task. For example,
- * "task_201011230308_87259_r_000240" would return 240.
+ * Get the id of this map or reduce task.
* @return The index of this tip in the maps/reduces lists.
*/
public int getIdWithinJob() {
@@ -1317,7 +1256,7 @@ class TaskInProgress {
public int getSuccessEventNumber() {
return successEventNumber;
}
-
+
/**
* Gets the Node list of input split locations sorted in rack order.
*/
@@ -1325,7 +1264,7 @@ class TaskInProgress {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
- String[] splits = splitInfo[0].getLocations(); // actually replicas
+ String[] splits = splitInfo.getLocations();
Node[] nodes = new Node[splits.length];
for (int i = 0; i < splits.length; i++) {
nodes[i] = jobtracker.getNode(splits[i]);
@@ -1354,20 +1293,13 @@ class TaskInProgress {
}
public long getMapInputSize() {
- if (isUberTask()) {
- int numSplits = splitInfo.length;
- long sumInputDataLength = 0;
- for (int i = 0; i < numSplits; ++i) {
- sumInputDataLength += splitInfo[i].getInputDataLength();
- }
- return sumInputDataLength;
- } else if (isMapTask() && !jobSetup && !jobCleanup) {
- return splitInfo[0].getInputDataLength();
+ if(isMapTask() && !jobSetup && !jobCleanup) {
+ return splitInfo.getInputDataLength();
} else {
return 0;
}
}
-
+
/**
* Compare most recent task attempts dispatch time to current system time so
* that task progress rate will slow down as time proceeds even if no progress
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 8 05:54:02 2011
@@ -41,15 +41,10 @@ public abstract class TaskStatus impleme
static final Log LOG =
LogFactory.getLog(TaskStatus.class.getName());
- // what kind of TaskStatus is it?
+ //enumeration for reporting current phase of a task.
@InterfaceAudience.Private
@InterfaceStability.Unstable
- public static enum Type {MAP, REDUCE, UBER}
-
- // enumeration for reporting current phase of a task.
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
+ public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
// what state is the task in?
@InterfaceAudience.Private
@@ -110,7 +105,6 @@ public abstract class TaskStatus impleme
public TaskAttemptID getTaskID() { return taskid; }
public abstract boolean getIsMap();
- public abstract boolean getIsUber();
public int getNumSlots() {
return numSlots;
}
@@ -501,61 +495,46 @@ public abstract class TaskStatus impleme
//////////////////////////////////////////////////////////////////////////////
// Factory-like methods to create/read/write appropriate TaskStatus objects
//////////////////////////////////////////////////////////////////////////////
-
- // this is the main one used by TT.TIP, JIP, and (apparently) all the
- // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus,
- // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities)
- // [also formerly MapTask and ReduceTask, but no longer]
+
+ static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId,
+ float progress, int numSlots,
+ State runState, String diagnosticInfo,
+ String stateString, String taskTracker,
+ Phase phase, Counters counters)
+ throws IOException {
+ boolean isMap = in.readBoolean();
+ return createTaskStatus(isMap, taskId, progress, numSlots, runState,
+ diagnosticInfo, stateString, taskTracker, phase,
+ counters);
+ }
+
static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId,
float progress, int numSlots,
State runState, String diagnosticInfo,
String stateString, String taskTracker,
Phase phase, Counters counters) {
return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState,
- diagnosticInfo, stateString, taskTracker,
+ diagnosticInfo, stateString, taskTracker,
phase, counters) :
new ReduceTaskStatus(taskId, progress, numSlots, runState,
diagnosticInfo, stateString,
taskTracker, phase, counters);
}
-
- // used only in default ctors of Task (also formerly MapTask, ReduceTask) and
- // readTaskStatus() below
- static TaskStatus createTaskStatus(Type tsType) {
- return (tsType == TaskStatus.Type.MAP)
- ? new MapTaskStatus()
- : (tsType == TaskStatus.Type.REDUCE)
- ? new ReduceTaskStatus()
- : new UberTaskStatus();
+
+ static TaskStatus createTaskStatus(boolean isMap) {
+ return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
}
static TaskStatus readTaskStatus(DataInput in) throws IOException {
boolean isMap = in.readBoolean();
- boolean isUber = in.readBoolean();
- Type tsType = isMap
- ? TaskStatus.Type.MAP
- : isUber
- ? TaskStatus.Type.UBER
- : TaskStatus.Type.REDUCE;
- TaskStatus taskStatus = createTaskStatus(tsType);
+ TaskStatus taskStatus = createTaskStatus(isMap);
taskStatus.readFields(in);
return taskStatus;
}
static void writeTaskStatus(DataOutput out, TaskStatus taskStatus)
throws IOException {
-/* LATER
- * //GRR FIXME: longer-term, just store tsType as member var (but then need
- * // to modify or add new ctor: used in many places)
- * Type tsType = taskStatus.getIsUber()
- * ? TaskStatus.Type.UBER
- * : taskStatus.getIsMap()
- * ? TaskStatus.Type.MAP
- * : TaskStatus.Type.REDUCE;
- * WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...)
- */
out.writeBoolean(taskStatus.getIsMap());
- out.writeBoolean(taskStatus.getIsUber());
taskStatus.write(out);
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:54:02 2011
@@ -827,7 +827,7 @@ public class TaskTracker
f = rjob.getFetchStatus();
for (TaskInProgress tip : rjob.tasks) {
Task task = tip.getTask();
- if (!task.isMapTask() && !task.isUberTask()) {
+ if (!task.isMapTask()) {
if (((ReduceTask)task).getPhase() ==
TaskStatus.Phase.SHUFFLE) {
if (rjob.getFetchStatus() == null) {
@@ -2502,27 +2502,21 @@ public class TaskTracker
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
- if (task.isUberTask()) {
- taskStatus = new UberTaskStatus(
- task.getTaskID(), 0.0f,
- task.getNumSlotsRequired(), task.getState(),
- diagnosticInfo.toString(), "initializing", getName(),
- TaskStatus.Phase.MAP, task.getCounters());
- } else {
- taskStatus = TaskStatus.createTaskStatus(
- task.isMapTask(), task.getTaskID(), 0.0f,
- task.getNumSlotsRequired(), task.getState(),
- diagnosticInfo.toString(), "initializing", getName(),
- task.isTaskCleanupTask()
- ? TaskStatus.Phase.CLEANUP
- : task.isMapTask()
- ? TaskStatus.Phase.MAP
- : TaskStatus.Phase.SHUFFLE,
- task.getCounters());
- }
+ taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
+ 0.0f,
+ task.getNumSlotsRequired(),
+ task.getState(),
+ diagnosticInfo.toString(),
+ "initializing",
+ getName(),
+ task.isTaskCleanupTask() ?
+ TaskStatus.Phase.CLEANUP :
+ task.isMapTask()? TaskStatus.Phase.MAP:
+ TaskStatus.Phase.SHUFFLE,
+ task.getCounters());
taskTimeout = (10 * 60 * 1000);
}
-
+
void localizeTask(Task task) throws IOException{
FileSystem localFs = FileSystem.getLocal(fConf);
@@ -2639,7 +2633,7 @@ public class TaskTracker
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
LOG.info("Not launching task: " + task.getTaskID() +
- " since its state is " + this.taskStatus.getRunState());
+ " since it's state is " + this.taskStatus.getRunState());
}
}
@@ -3212,7 +3206,7 @@ public class TaskTracker
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
if (!jvmManager.isJvmKnown(jvmId)) {
- LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME: bug? no (apparent) killing going on here...
+ LOG.info("Killing unknown JVM " + jvmId);
return new JvmTask(null, true);
}
RunningJob rjob = runningJobs.get(jvmId.getJobId());
@@ -3243,7 +3237,7 @@ public class TaskTracker
public synchronized boolean statusUpdate(TaskAttemptID taskid,
TaskStatus taskStatus)
throws IOException {
- TaskInProgress tip = tasks.get(taskid); // TT.TIP, not TaskInProgress.java
+ TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportProgress(taskStatus);
myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
@@ -3691,15 +3685,17 @@ public class TaskTracker
}
userName = rjob.jobConf.getUser();
}
-
- // Map-output filename ("... /file.out")
- StringBuilder sb = new StringBuilder();
- sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId))
- .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING);
- Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
- // Index filename ("... /file.out.index")
- sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING);
- Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf);
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ userName, jobId, mapId)
+ + "/file.out.index", conf);
+
+ // Map-output file
+ Path mapOutputFileName =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ userName, jobId, mapId)
+ + "/file.out", conf);
/**
* Read the index file to get the information about where the map-output
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar 8 05:54:02 2011
@@ -271,19 +271,4 @@ public interface MRJobConfig {
"mapreduce.job.submithostname";
public static final String JOB_SUBMITHOSTADDR =
"mapreduce.job.submithostaddress";
-
- public static final String JOB_UBERTASK_ENABLE =
- "mapreduce.job.ubertask.enable";
- public static final String JOB_UBERTASK_MAXMAPS =
- "mapreduce.job.ubertask.maxmaps";
- public static final String JOB_UBERTASK_MAXREDUCES =
- "mapreduce.job.ubertask.maxreduces";
- public static final String JOB_UBERTASK_MAXBYTES =
- "mapreduce.job.ubertask.maxbytes";
- public static final String UBERTASK_JAVA_OPTS =
- "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
- public static final String UBERTASK_ULIMIT =
- "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
- public static final String UBERTASK_ENV =
- "mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar 8 05:54:02 2011
@@ -64,9 +64,6 @@
{"name": "launchTime", "type": "long"},
{"name": "totalMaps", "type": "int"},
{"name": "totalReduces", "type": "int"},
- {"name": "isUber", "type": "boolean"},
- {"name": "numUberSubMaps", "type": "int"},
- {"name": "numUberSubReduces", "type": "int"},
{"name": "jobStatus", "type": "string"}
]
},
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar 8 05:54:02 2011
@@ -526,8 +526,6 @@ public class HistoryViewer {
int totalReduces = 0;
int totalCleanups = 0;
int totalSetups = 0;
- int numUberSubMaps = 0;
- int numUberSubReduces = 0;
int numFailedMaps = 0;
int numKilledMaps = 0;
int numFailedReduces = 0;
@@ -546,20 +544,15 @@ public class HistoryViewer {
long cleanupFinished = 0;
long setupStarted = 0;
long setupFinished = 0;
- boolean isUber = false;
-
+
/** Get total maps */
public int getTotalMaps() { return totalMaps; }
/** Get total reduces */
public int getTotalReduces() { return totalReduces; }
- /** Get number of cleanup tasks */
+ /** Get number of clean up tasks */
public int getTotalCleanups() { return totalCleanups; }
- /** Get number of setup tasks */
+ /** Get number of set up tasks */
public int getTotalSetups() { return totalSetups; }
- /** Get number of map subtasks within UberTask */
- public int getNumUberSubMaps() { return numUberSubMaps; }
- /** Get number of reduce subtasks within UberTask */
- public int getNumUberSubReduces() { return numUberSubReduces; }
/** Get number of failed maps */
public int getNumFailedMaps() { return numFailedMaps; }
/** Get number of killed maps */
@@ -574,11 +567,11 @@ public class HistoryViewer {
public int getNumFailedCleanups() { return numFailedCleanups; }
/** Get number of killed cleanup tasks */
public int getNumKilledCleanups() { return numKilledCleanups; }
- /** Get number of finished setup tasks */
+ /** Get number of finished set up tasks */
public int getNumFinishedSetups() { return numFinishedSetups; }
- /** Get number of failed setup tasks */
+ /** Get number of failed set up tasks */
public int getNumFailedSetups() { return numFailedSetups; }
- /** Get number of killed setup tasks */
+ /** Get number of killed set up tasks */
public int getNumKilledSetups() { return numKilledSetups; }
/** Get number of maps that were started */
public long getMapStarted() { return mapStarted; }
@@ -596,10 +589,8 @@ public class HistoryViewer {
public long getSetupStarted() { return setupStarted; }
/** Get number of setup tasks that finished */
public long getSetupFinished() { return setupFinished; }
- /** Get job's UberTask/non-UberTask status */
- public boolean isUber() { return isUber; }
- /** Create summary information for the parsed job */
+ /** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
tasks = job.getAllTasks();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Mar 8 05:54:02 2011
@@ -302,9 +302,6 @@ public class JobHistoryParser {
info.launchTime = event.getLaunchTime();
info.totalMaps = event.getTotalMaps();
info.totalReduces = event.getTotalReduces();
- info.isUber = event.getIsUber();
- info.numUberSubMaps = event.getNumUberSubMaps();
- info.numUberSubReduces = event.getNumUberSubReduces();
}
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
@@ -337,9 +334,6 @@ public class JobHistoryParser {
long launchTime;
int totalMaps;
int totalReduces;
- boolean isUber;
- int numUberSubMaps;
- int numUberSubReduces;
int failedMaps;
int failedReduces;
int finishedMaps;
@@ -358,9 +352,8 @@ public class JobHistoryParser {
*/
public JobInfo() {
submitTime = launchTime = finishTime = -1;
- isUber = false;
- totalMaps = totalReduces = numUberSubMaps = numUberSubReduces = 0;
- failedMaps = failedReduces = finishedMaps = finishedReduces = 0;
+ totalMaps = totalReduces = failedMaps = failedReduces = 0;
+ finishedMaps = finishedReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
@@ -377,7 +370,6 @@ public class JobHistoryParser {
System.out.println("PRIORITY: " + priority);
System.out.println("TOTAL_MAPS: " + totalMaps);
System.out.println("TOTAL_REDUCES: " + totalReduces);
- //GRR FIXME: add UBER_SUBMAPS and UBER_SUBREDUCES? (or only if isUber == true? coordinate with TaskInfo printAll() changes)
System.out.println("MAP_COUNTERS:" + mapCounters.toString());
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
@@ -403,24 +395,18 @@ public class JobHistoryParser {
public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */
public long getLaunchTime() { return launchTime; }
- /** Get the total number of "real" maps */
- public int getTotalMaps() { return totalMaps; }
- /** Get the total number of "real" reduces */
- public int getTotalReduces() { return totalReduces; }
- /** Was the job small enough to be converted to an UberTask? */
- public boolean getIsUber() { return isUber; }
- /** Get the number of sub-MapTasks within the UberTask */
- public int getNumUberSubMaps() { return numUberSubMaps; }
- /** Get the number of sub-ReduceTasks within the UberTask */
- public int getNumUberSubReduces() { return numUberSubReduces; }
+ /** Get the total number of maps */
+ public long getTotalMaps() { return totalMaps; }
+ /** Get the total number of reduces */
+ public long getTotalReduces() { return totalReduces; }
/** Get the total number of failed maps */
- public int getFailedMaps() { return failedMaps; }
+ public long getFailedMaps() { return failedMaps; }
/** Get the number of failed reduces */
- public int getFailedReduces() { return failedReduces; }
+ public long getFailedReduces() { return failedReduces; }
/** Get the number of finished maps */
- public int getFinishedMaps() { return finishedMaps; }
+ public long getFinishedMaps() { return finishedMaps; }
/** Get the number of finished reduces */
- public int getFinishedReduces() { return finishedReduces; }
+ public long getFinishedReduces() { return finishedReduces; }
/** Get the job status */
public String getJobStatus() { return jobStatus; }
public String getErrorInfo() { return errorInfo; }
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Mar 8 05:54:02 2011
@@ -44,15 +44,11 @@ public class JobInitedEvent implements H
* @param jobStatus
*/
public JobInitedEvent(JobID id, long launchTime, int totalMaps,
- int totalReduces, boolean isUber, int numUberSubMaps,
- int numUberSubReduces, String jobStatus) {
+ int totalReduces, String jobStatus) {
datum.jobid = new Utf8(id.toString());
datum.launchTime = launchTime;
datum.totalMaps = totalMaps;
datum.totalReduces = totalReduces;
- datum.isUber = isUber;
- datum.numUberSubMaps = numUberSubMaps;
- datum.numUberSubReduces = numUberSubReduces;
datum.jobStatus = new Utf8(jobStatus);
}
@@ -65,16 +61,10 @@ public class JobInitedEvent implements H
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
/** Get the launch time */
public long getLaunchTime() { return datum.launchTime; }
- /** Get the total number of "real" maps */
+ /** Get the total number of maps */
public int getTotalMaps() { return datum.totalMaps; }
- /** Get the total number of "real" reduces */
+ /** Get the total number of reduces */
public int getTotalReduces() { return datum.totalReduces; }
- /** Was the job small enough to be converted to an UberTask? */
- public boolean getIsUber() { return datum.isUber; }
- /** Get the number of sub-MapTasks within the UberTask */
- public int getNumUberSubMaps() { return datum.numUberSubMaps; }
- /** Get the number of sub-ReduceTasks within the UberTask */
- public int getNumUberSubReduces() { return datum.numUberSubReduces; }
/** Get the status */
public String getStatus() { return datum.jobStatus.toString(); }
/** Get the event type */
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Mar 8 05:54:02 2011
@@ -241,12 +241,8 @@ public class TestJobQueueTaskScheduler e
}
@Override
- public boolean getIsUber() {
- return t.isUberTask();
- }
-
- @Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+
}
};
status.setRunState(TaskStatus.State.RUNNING);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar 8 05:54:02 2011
@@ -103,7 +103,7 @@ public class TestTaskStatus {
// check the default case
String test = "hi";
final int maxSize = 16;
- TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
+ TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
null) {
@Override
protected int getMaxStringSize() {
@@ -118,11 +118,6 @@ public class TestTaskStatus {
public boolean getIsMap() {
return false;
}
-
- @Override
- public boolean getIsUber() {
- return false;
- }
};
assertEquals("Small diagnostic info test failed",
status.getDiagnosticInfo(), test);
@@ -203,11 +198,6 @@ public class TestTaskStatus {
public boolean getIsMap() {
return false;
}
-
- @Override
- public boolean getIsUber() {
- return false;
- }
};
assertEquals("Large diagnostic info test failed",
maxSize, status.getDiagnosticInfo().length());
Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Tue Mar 8 05:54:02 2011
@@ -125,15 +125,10 @@ public class Job20LineHistoryEventEmitte
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
- // note: UberTask playback not supported since uber data not yet logged
- boolean isUber = false;
- int numUberSubMaps = 0;
- int numUberSubReduces = 0;
if (launchTime != null && totalMaps != null && totalReduces != null) {
- return new JobInitedEvent(jobID, Long.parseLong(launchTime),
- Integer.parseInt(totalMaps), Integer.parseInt(totalReduces),
- isUber, numUberSubMaps, numUberSubReduces, status);
+ return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
+ .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
}
return null;
Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp Tue Mar 8 05:54:02 2011
@@ -365,16 +365,10 @@
"<th>Killed</th>" +
"<th><a href=\"jobfailures.jsp?jobid=" + jobId +
"\">Failed/Killed<br>Task Attempts</a></th></tr>\n");
- if (job.getUberMode()) {
- /* placeholder until true task- and attempt-level uber info available */
- printTaskSummary(out, jobId, "uber", status.reduceProgress(),
- job.getTasks(TaskType.REDUCE));
- } else {
- printTaskSummary(out, jobId, "map", status.mapProgress(),
- job.getTasks(TaskType.MAP));
- printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
- job.getTasks(TaskType.REDUCE));
- }
+ printTaskSummary(out, jobId, "map", status.mapProgress(),
+ job.getTasks(TaskType.MAP));
+ printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
+ job.getTasks(TaskType.REDUCE));
out.print("</table>\n");
%>
@@ -427,7 +421,6 @@
%>
</table>
-<%if (job.getTasks(TaskType.MAP).length > 0) { %>
<hr>Map Completion Graph -
<%
if("off".equals(request.getParameter("map.graph"))) {
@@ -449,14 +442,10 @@ if("off".equals(session.getAttribute("ma
width="<%=TaskGraphServlet.width + 2 * TaskGraphServlet.xmargin%>"
height="<%=TaskGraphServlet.height + 3 * TaskGraphServlet.ymargin%>"
style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/" />
-<%} }%>
+<%}%>
-<%if (job.getTasks(TaskType.REDUCE).length > 0) { %>
-<%if (job.getUberMode()) { %>
-<hr>UberTask Completion Graph -
-<%} else { %>
+<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
<hr>Reduce Completion Graph -
-<%}%>
<%if("off".equals(session.getAttribute("reduce.graph"))) { %>
<a href="/jobdetails.jsp?jobid=<%=jobId%>&refresh=<%=refresh%>&reduce.graph=on" > open </a>
<%} else { %>
Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp?rev=1079192&r1=1079191&r2=1079192&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp Tue Mar 8 05:54:02 2011
@@ -56,12 +56,6 @@
if (job == null) {
return;
}
- // MR-1220 FIXME (LATER): to fully integrate uberization, need task-
- // and attempt-level JobHistory/Avro/etc. changes to multiple Event
- // types; instead, going with top-level hack for now
- boolean isUber = job.getIsUber();
- int numUberSubMaps = job.getNumUberSubMaps();
- int numUberSubReduces = job.getNumUberSubReduces();
if (job.getJobStatus().equals("FAILED"))
reasonforFailure = job.getErrorInfo();
%>
@@ -96,7 +90,7 @@
<center>
<table border="2" cellpadding="5" cellspacing="2">
<tr>
-<td>Kind</td><td>Total Tasks (successful+<wbr>failed+<wbr>killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
+<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
</tr>
<tr>
<td>Setup</td>
@@ -112,10 +106,6 @@
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getSetupFinished(), sj.getSetupStarted()) %></td>
</tr>
<tr>
-
-<%
- if (!isUber) {
-%>
<td>Map</td>
<td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=MAP&status=all">
<%=sj.getTotalMaps()%></a></td>
@@ -141,36 +131,6 @@
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
</tr>
-
-<%
- } else /* isUber */ {
-%>
-
-<tr>
-<td>Uber</td>
- <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=all">
- <%=sj.getTotalReduces()%></a></td>
- <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=SUCCEEDED">
- <%=job.getFinishedReduces()%></a></td>
- <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=FAILED">
- <%=sj.getNumFailedReduces()%></a></td>
- <td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=REDUCE&status=KILLED">
- <%=sj.getNumKilledReduces()%></a></td>
- <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %></td>
- <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %></td>
-</tr>
-<td>Map subtasks</td>
- <td colspan="6"><%=numUberSubMaps%></td>
-</tr>
-<tr>
-<td>Reduce subtasks</td>
- <td colspan="6"><%=numUberSubReduces%></td>
-</tr>
-
-<%
- }
-%>
-
<tr>
<td>Cleanup</td>
<td><a href="jobtaskshistory.jsp?logFile=<%=logFile%>&taskType=JOB_CLEANUP&status=all">
|