Author: acmurthy
Date: Fri Aug 12 21:05:10 2011
New Revision: 1157253
URL: http://svn.apache.org/viewvc?rev=1157253&view=rev
Log:
MAPREDUCE-2037. Capture intermediate progress, CPU and memory usage for tasks. Contributed by Dick King.
Added:
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
Modified:
hadoop/common/trunk/mapreduce/CHANGES.txt
hadoop/common/trunk/mapreduce/src/java/mapred-default.xml
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Fri Aug 12 21:05:10 2011
@@ -38,6 +38,9 @@ Trunk (unreleased changes)
MAPREDUCE-2323. Add metrics to the fair scheduler. (todd)
+ MAPREDUCE-2037. Capture intermediate progress, CPU and memory usage for
+ tasks. (Dick King via acmurthy)
+
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
Modified: hadoop/common/trunk/mapreduce/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/mapred-default.xml?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/mapred-default.xml (original)
+++ hadoop/common/trunk/mapreduce/src/java/mapred-default.xml Fri Aug 12 21:05:10 2011
@@ -33,6 +33,29 @@
</property>
<property>
+ <name>mapreduce.jobtracker.jobhistory.task.numberprogresssplits</name>
+ <value>12</value>
+ <description> Every task attempt progresses from 0.0 to 1.0 [unless
+ it fails or is killed]. We record, for each task attempt, certain
+ statistics over each twelfth of the progress range. You can change
+ the number of intervals we divide the entire range of progress into
+ by setting this property. Higher values give more precision to the
+ recorded data, but costs more memory in the job tracker at runtime.
+ Each increment in this attribute costs 16 bytes per running task.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.job.userhistorylocation</name>
+ <value></value>
+ <description> User can specify a location to store the history files of
+ a particular job. If nothing is specified, the logs are stored in
+ output directory. The files are stored in "_logs/history/" in the directory.
+ User can stop logging by giving the value "none".
+ </description>
+</property>
+
+<property>
<name>mapreduce.jobtracker.jobhistory.completed.location</name>
<value></value>
<description> The completed job history files are stored at this single well
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/Counters.java Fri Aug 12 21:05:10 2011
@@ -413,23 +413,28 @@ public class Counters implements Writabl
* with the specified name.
*/
public synchronized Group getGroup(String groupName) {
- // To provide support for deprecated group names
- if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
- groupName = "org.apache.hadoop.mapreduce.TaskCounter";
- LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
- " Use org.apache.hadoop.mapreduce.TaskCounter instead");
- } else if (groupName.equals(
- "org.apache.hadoop.mapred.JobInProgress$Counter")) {
- groupName = "org.apache.hadoop.mapreduce.JobCounter";
- LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
- "is deprecated. Use " +
- "org.apache.hadoop.mapreduce.JobCounter instead");
- }
Group result = counters.get(groupName);
+
if (result == null) {
+ // To provide support for deprecated group names
+ if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
+ LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
+ " Use org.apache.hadoop.mapreduce.TaskCounter instead");
+ return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
+ }
+
+ if (groupName.equals
+ ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
+ LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
+ "is deprecated. Use " +
+ "org.apache.hadoop.mapreduce.JobCounter instead");
+ return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+ }
+
result = new Group(groupName);
counters.put(groupName, result);
}
+
return result;
}
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ * measurements where the raw data are a measurement of an
+ * accumulation. The result in each bucket is the estimate
+ * of the progress-weighted change in that quantity over the
+ * progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ * a distance traveled. It makes sense to consider that portion of
+ * the total travel that can be apportioned to each bucket.
+ *
+ */
+class CumulativePeriodicStats extends PeriodicStatsAccumulator {
+ // int's are acceptable here, even though times are normally
+ // long's, because these are a difference and an int won't
+ // overflow for 24 days. Tasks can't run for more than about a
+ // week for other reasons, and most jobs would be written
+ int previousValue = 0;
+
+ CumulativePeriodicStats(int count) {
+ super(count);
+ }
+
+ /**
+ *
+ * accumulates a new reading by keeping a running account of the
+ * value distance from the beginning of the bucket to the end of
+ * this reading
+ */
+ @Override
+ protected void extendInternal(double newProgress, int newValue) {
+ if (state == null) {
+ return;
+ }
+
+ state.currentAccumulation += (double)(newValue - previousValue);
+ previousValue = newValue;
+ }
+}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Aug 12 21:05:10 2011
@@ -2673,25 +2673,29 @@ public class JobInProgress {
status.getTaskTracker(), ttStatus.getHttpPort());
jobHistory.logEvent(tse, status.getTaskID().getJobID());
-
+ TaskAttemptID statusAttemptID = status.getTaskID();
if (status.getIsMap()){
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
- status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+ statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(),
status.getFinishTime(), trackerHostname,
status.getStateString(),
- new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+ new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+ tip.getSplits(statusAttemptID).burst()
+ );
jobHistory.logEvent(mfe, status.getTaskID().getJobID());
}else{
ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
- status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+ statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, status.getStateString(),
- new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+ new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+ tip.getSplits(statusAttemptID).burst()
+ );
jobHistory.logEvent(rfe, status.getTaskID().getJobID());
@@ -3171,12 +3175,16 @@ public class JobInProgress {
taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
jobHistory.logEvent(tse, taskid.getJobID());
+
+ ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
- TaskAttemptUnsuccessfulCompletionEvent tue =
- new TaskAttemptUnsuccessfulCompletionEvent(taskid,
- taskType, taskStatus.getRunState().toString(),
- finishTime,
- taskTrackerHostName, diagInfo);
+ TaskAttemptUnsuccessfulCompletionEvent tue =
+ new TaskAttemptUnsuccessfulCompletionEvent
+ (taskid,
+ taskType, taskStatus.getRunState().toString(),
+ finishTime,
+ taskTrackerHostName, diagInfo,
+ splits.burst());
jobHistory.logEvent(tue, taskid.getJobID());
// After this, try to assign tasks with the one after this, so that
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This abstract class that represents a bucketed series of
+ * measurements of a quantity being measured in a running task
+ * attempt.
+ *
+ * <p>The sole constructor is called with a count, which is the
+ * number of buckets into which we evenly divide the spectrum of
+ * progress from 0.0D to 1.0D . In the future we may provide for
+ * custom split points that don't have to be uniform.
+ *
+ * <p>A subclass determines how we fold readings for portions of a
+ * bucket and how we interpret the readings by overriding
+ * {@code extendInternal(...)} and {@code initializeInterval()}
+ */
+public abstract class PeriodicStatsAccumulator {
+ // The range of progress from 0.0D through 1.0D is divided into
+ // count "progress segments". This object accumulates an
+ // estimate of the effective value of a time-varying value during
+ // the zero-based i'th progress segment, ranging from i/count
+ // through (i+1)/count .
+ // This is an abstract class. We have two implementations: one
+ // for monotonically increasing time-dependent variables
+ // [currently, CPU time in milliseconds and wallclock time in
+ // milliseconds] and one for quantities that can vary arbitrarily
+ // over time, currently virtual and physical memory used, in
+ // kilobytes.
+ // We carry int's here. This saves a lot of JVM heap space in the
+ // job tracker per running task attempt [200 bytes per] but it
+ // has a small downside.
+ // No task attempt can run for more than 57 days nor occupy more
+ // than two terabytes of virtual memory.
+ protected final int count;
+ protected final int[] values;
+
+ static class StatsetState {
+ int oldValue = 0;
+ double oldProgress = 0.0D;
+
+ double currentAccumulation = 0.0D;
+ }
+
+ // We provide this level of indirection to reduce the memory
+ // footprint of done task attempts. When a task's progress
+ // reaches 1.0D, we delete this objecte StatsetState.
+ StatsetState state = new StatsetState();
+
+ PeriodicStatsAccumulator(int count) {
+ this.count = count;
+ this.values = new int[count];
+ for (int i = 0; i < count; ++i) {
+ values[i] = -1;
+ }
+ }
+
+ protected int[] getValues() {
+ return values;
+ }
+
+ // The concrete implementation of this abstract function
+ // accumulates more data into the current progress segment.
+ // newProgress [from the call] and oldProgress [from the object]
+ // must be in [or at the border of] a single progress segment.
+ /**
+ *
+ * adds a new reading to the current bucket.
+ *
+ * @param newProgress the endpoint of the interval this new
+ * reading covers
+ * @param newValue the value of the reading at {@code newProgress}
+ *
+ * The class has three instance variables, {@code oldProgress} and
+ * {@code oldValue} and {@code currentAccumulation}.
+ *
+ * {@code extendInternal} can count on three things:
+ *
+ * 1: The first time it's called in a particular instance, both
+ * oldXXX's will be zero.
+ *
+ * 2: oldXXX for a later call is the value of newXXX of the
+ * previous call. This ensures continuity in accumulation from
+ * one call to the next.
+ *
+ * 3: {@code currentAccumulation} is owned by
+ * {@code initializeInterval} and {@code extendInternal}.
+ */
+ protected abstract void extendInternal(double newProgress, int newValue);
+
+ // What has to be done when you open a new interval
+ /**
+ * initializes the state variables to be ready for a new interval
+ */
+ protected void initializeInterval() {
+ state.currentAccumulation = 0.0D;
+ }
+
+ // called for each new reading
+ /**
+ * This method calls {@code extendInternal} at least once. It
+ * divides the current progress interval [from the last call's
+ * {@code newProgress} to this call's {@code newProgress} ]
+ * into one or more subintervals by splitting at any point which
+ * is an interval boundary if there are any such points. It
+ * then calls {@code extendInternal} for each subinterval, or the
+ * whole interval if there are no splitting points.
+ *
+ * <p>For example, if the value was {@code 300} last time with
+ * {@code 0.3} progress, and count is {@code 5}, and you get a
+ * new reading with the variable at {@code 700} and progress at
+ * {@code 0.7}, you get three calls to {@code extendInternal}:
+ * one extending from progress {@code 0.3} to {@code 0.4} [the
+ * next boundary] with a value of {@code 400}, the next one
+ * through {@code 0.6} with a value of {@code 600}, and finally
+ * one at {@code 700} with a progress of {@code 0.7} .
+ *
+ * @param newProgress the endpoint of the progress range this new
+ * reading covers
+ * @param newValue the value of the reading at {@code newProgress}
+ */
+ protected void extend(double newProgress, int newValue) {
+ if (state == null || newProgress < state.oldProgress) {
+ return;
+ }
+
+ // This correctness of this code depends on 100% * count = count.
+ int oldIndex = (int)(state.oldProgress * count);
+ int newIndex = (int)(newProgress * count);
+ int originalOldValue = state.oldValue;
+
+ double fullValueDistance = (double)newValue - state.oldValue;
+ double fullProgressDistance = newProgress - state.oldProgress;
+ double originalOldProgress = state.oldProgress;
+
+ // In this loop we detect each subinterval boundary within the
+ // range from the old progress to the new one. Then we
+ // interpolate the value from the old value to the new one to
+ // infer what its value might have been at each such boundary.
+ // Lastly we make the necessary calls to extendInternal to fold
+ // in the data for each trapazoid where no such trapazoid
+ // crosses a boundary.
+ for (int closee = oldIndex; closee < newIndex; ++closee) {
+ double interpolationProgress = (double)(closee + 1) / count;
+ // In floats, x * y / y might not equal y.
+ interpolationProgress = Math.min(interpolationProgress, newProgress);
+
+ double progressLength = (interpolationProgress - originalOldProgress);
+ double interpolationProportion = progressLength / fullProgressDistance;
+
+ double interpolationValueDistance
+ = fullValueDistance * interpolationProportion;
+
+ // estimates the value at the next [interpolated] subsegment boundary
+ int interpolationValue
+ = (int)interpolationValueDistance + originalOldValue;
+
+ extendInternal(interpolationProgress, interpolationValue);
+
+ advanceState(interpolationProgress, interpolationValue);
+
+ values[closee] = (int)state.currentAccumulation;
+ initializeInterval();
+
+ }
+
+ extendInternal(newProgress, newValue);
+ advanceState(newProgress, newValue);
+
+ if (newIndex == count) {
+ state = null;
+ }
+ }
+
+ protected void advanceState(double newProgress, int newValue) {
+ state.oldValue = newValue;
+ state.oldProgress = newProgress;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ int get(int index) {
+ return values[index];
+ }
+}
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+/*
+ * This object gathers the [currently four] PeriodStatset's that we
+ * are gathering for a particular task attempt for packaging and
+ * handling as a single object.
+ */
+public class ProgressSplitsBlock {
+ final PeriodicStatsAccumulator progressWallclockTime;
+ final PeriodicStatsAccumulator progressCPUTime;
+ final PeriodicStatsAccumulator progressVirtualMemoryKbytes;
+ final PeriodicStatsAccumulator progressPhysicalMemoryKbytes;
+
+ static final int[] NULL_ARRAY = new int[0];
+
+ static final int WALLCLOCK_TIME_INDEX = 0;
+ static final int CPU_TIME_INDEX = 1;
+ static final int VIRTUAL_MEMORY_KBYTES_INDEX = 2;
+ static final int PHYSICAL_MEMORY_KBYTES_INDEX = 3;
+
+ static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
+
+ ProgressSplitsBlock(int numberSplits) {
+ progressWallclockTime
+ = new CumulativePeriodicStats(numberSplits);
+ progressCPUTime
+ = new CumulativePeriodicStats(numberSplits);
+ progressVirtualMemoryKbytes
+ = new StatePeriodicStats(numberSplits);
+ progressPhysicalMemoryKbytes
+ = new StatePeriodicStats(numberSplits);
+ }
+
+ // this coordinates with LoggedTaskAttempt.SplitVectorKind
+ int[][] burst() {
+ int[][] result = new int[4][];
+
+ result[WALLCLOCK_TIME_INDEX] = progressWallclockTime.getValues();
+ result[CPU_TIME_INDEX] = progressCPUTime.getValues();
+ result[VIRTUAL_MEMORY_KBYTES_INDEX] = progressVirtualMemoryKbytes.getValues();
+ result[PHYSICAL_MEMORY_KBYTES_INDEX] = progressPhysicalMemoryKbytes.getValues();
+
+ return result;
+ }
+
+ static public int[] arrayGet(int[][] burstedBlock, int index) {
+ return burstedBlock == null ? NULL_ARRAY : burstedBlock[index];
+ }
+
+ static public int[] arrayGetWallclockTime(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, WALLCLOCK_TIME_INDEX);
+ }
+
+ static public int[] arrayGetCPUTime(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, CPU_TIME_INDEX);
+ }
+
+ static public int[] arrayGetVMemKbytes(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, VIRTUAL_MEMORY_KBYTES_INDEX);
+ }
+
+ static public int[] arrayGetPhysMemKbytes(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, PHYSICAL_MEMORY_KBYTES_INDEX);
+ }
+}
+
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ * measurements where the raw data are a measurement of a
+ * time-varying quantity. The result in each bucket is the estimate
+ * of the progress-weighted mean value of that quantity over the
+ * progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ * a temperature. It makes sense to consider the mean temperature
+ * over a progress range.
+ *
+ */
+class StatePeriodicStats extends PeriodicStatsAccumulator {
+ StatePeriodicStats(int count) {
+ super(count);
+ }
+
+ /**
+ *
+ * accumulates a new reading by keeping a running account of the
+ * area under the piecewise linear curve marked by pairs of
+ * {@code newProgress, newValue} .
+ */
+ @Override
+ protected void extendInternal(double newProgress, int newValue) {
+ if (state == null) {
+ return;
+ }
+
+ // the effective height of this trapezoid if rectangularized
+ double mean = ((double)newValue + (double)state.oldValue)/2.0D;
+
+ // conceptually mean * (newProgress - state.oldProgress) / (1 / count)
+ state.currentAccumulation += mean * (newProgress - state.oldProgress) * count;
+ }
+}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Aug 12 21:05:10 2011
@@ -31,25 +31,32 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
import org.apache.hadoop.mapred.SortedRanges.Range;
+
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
import org.apache.hadoop.net.Node;
+
/*************************************************************
* TaskInProgress maintains all the info needed for a
* Task in the lifetime of its owning Job. A given Task
* might be speculatively executed or reexecuted, so we
* need a level of indirection above the running-id itself.
* <br>
- * A given TaskInProgress contains multiple taskids,
+ * A given TaskInProgress contains multiple task attempt ids,
* 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.) A taskid
- * is now *never* recycled. A TIP allocates enough taskids
+ * (That's what allows speculative execution.) A task attempt id
+ * is now *never* recycled. A TIP allocates enough task attempt ids
* to account for all the speculation and failures it will
* ever have to handle. Once those are up, the TIP is dead.
* **************************************************************
@@ -60,6 +67,10 @@ class TaskInProgress {
static final long SPECULATIVE_LAG = 60 * 1000;
private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+ private static final long MEMORY_SPLITS_RESOLUTION = 1024;
+
+ static final int DEFAULT_STATISTICS_INTERVALS = 12;
+
public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
// Defines the TIP
@@ -91,6 +102,10 @@ class TaskInProgress {
private volatile boolean skipping = false;
private boolean jobCleanup = false;
private boolean jobSetup = false;
+
+ private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+ private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+ private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -109,12 +124,20 @@ class TaskInProgress {
private JobConf conf;
private Map<TaskAttemptID,List<String>> taskDiagnosticData =
new TreeMap<TaskAttemptID,List<String>>();
+
/**
- * Map from taskId -> TaskStatus
+ * Map from task attempt Id -> TaskStatus
*/
TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();
+
+ /**
+ * Map from task attempt Id -> splits block
+ */
+ private Map<TaskAttemptID, ProgressSplitsBlock> splitsBlocks
+ = new TreeMap<TaskAttemptID, ProgressSplitsBlock>();
+
// Map from taskId -> TaskTracker Id,
// contains cleanup attempts and where they ran, if any
private TreeMap<TaskAttemptID, String> cleanupTasks =
@@ -183,6 +206,65 @@ class TaskInProgress {
}
this.user = job.getUser();
}
+
+ synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
+ ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);
+
+ if (result == null) {
+ result
+ = new ProgressSplitsBlock
+ (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
+ ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
+ splitsBlocks.put(statusAttemptID, result);
+ }
+
+ return result;
+ }
+
+ private void updateProgressSplits(TaskStatus taskStatus) {
+ if (!taskStatus.getIncludeCounters()) {
+ return;
+ }
+
+ double newProgress = taskStatus.getProgress();
+
+ Counters counters = taskStatus.getCounters();
+
+ TaskAttemptID statusAttemptID = taskStatus.getTaskID();
+ ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
+
+ if (splitsBlock != null) {
+
+ long now = JobTracker.getClock().getTime();
+ Long start = getDispatchTime(statusAttemptID);
+
+ if (start != null && now - start <= Integer.MAX_VALUE) {
+ splitsBlock.progressWallclockTime.extend
+ (newProgress, (int)(now - start));
+ }
+
+ Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
+ if (cpuCounter != null
+ && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
+ splitsBlock.progressCPUTime.extend
+ (newProgress, (int)(cpuCounter.getCounter()));
+ }
+
+ Counters.Counter virtualBytes = counters.findCounter(VM_BYTES_KEY);
+ if (virtualBytes != null) {
+ splitsBlock.progressVirtualMemoryKbytes.extend
+ (newProgress,
+ (int)(virtualBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+
+ Counters.Counter physicalBytes = counters.findCounter(PHYSICAL_BYTES_KEY);
+ if (physicalBytes != null) {
+ splitsBlock.progressPhysicalMemoryKbytes.extend
+ (newProgress,
+ (int)(physicalBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+ }
+ }
+ }
/**
* Set the max number of attempts before we declare a TIP as "failed"
@@ -294,6 +376,7 @@ class TaskInProgress {
return execFinishTime;
}
+
/**
* Set the exec finish time
*/
@@ -582,23 +665,24 @@ class TaskInProgress {
* @return has the task changed its state noticeably?
*/
synchronized boolean updateStatus(TaskStatus status) {
- TaskAttemptID taskid = status.getTaskID();
- String tracker = status.getTaskTracker();
- String diagInfo = status.getDiagnosticInfo();
- TaskStatus oldStatus = taskStatuses.get(taskid);
- boolean changed = true;
- if (diagInfo != null && diagInfo.length() > 0) {
- LOG.info("Error from " + taskid + " on " + tracker + ": "+ diagInfo);
- addDiagnosticInfo(taskid, diagInfo);
- }
-
- if(skipping) {
- failedRanges.updateState(status);
- }
-
- if (oldStatus != null) {
- TaskStatus.State oldState = oldStatus.getRunState();
- TaskStatus.State newState = status.getRunState();
+ try {
+ TaskAttemptID taskid = status.getTaskID();
+ String tracker = status.getTaskTracker();
+ String diagInfo = status.getDiagnosticInfo();
+ TaskStatus oldStatus = taskStatuses.get(taskid);
+ boolean changed = true;
+ if (diagInfo != null && diagInfo.length() > 0) {
+ LOG.info("Error from " + taskid + " on " + tracker + ": "+ diagInfo);
+ addDiagnosticInfo(taskid, diagInfo);
+ }
+
+ if(skipping) {
+ failedRanges.updateState(status);
+ }
+
+ if (oldStatus != null) {
+ TaskStatus.State oldState = oldStatus.getRunState();
+ TaskStatus.State newState = status.getRunState();
// We should never receive a duplicate success/failure/killed
// status update for the same taskid! This is a safety check,
@@ -617,60 +701,63 @@ class TaskInProgress {
return false;
}
- // The task is not allowed to move from completed back to running.
- // We have seen out of order status messagesmoving tasks from complete
- // to running. This is a spot fix, but it should be addressed more
- // globally.
- if ((newState == TaskStatus.State.RUNNING ||
- newState == TaskStatus.State.UNASSIGNED) &&
- (oldState == TaskStatus.State.FAILED ||
- oldState == TaskStatus.State.KILLED ||
- oldState == TaskStatus.State.FAILED_UNCLEAN ||
- oldState == TaskStatus.State.KILLED_UNCLEAN ||
- oldState == TaskStatus.State.SUCCEEDED ||
- oldState == TaskStatus.State.COMMIT_PENDING)) {
- return false;
- }
+ // The task is not allowed to move from completed back to running.
+ // We have seen out of order status messagesmoving tasks from complete
+ // to running. This is a spot fix, but it should be addressed more
+ // globally.
+ if ((newState == TaskStatus.State.RUNNING ||
+ newState == TaskStatus.State.UNASSIGNED) &&
+ (oldState == TaskStatus.State.FAILED ||
+ oldState == TaskStatus.State.KILLED ||
+ oldState == TaskStatus.State.FAILED_UNCLEAN ||
+ oldState == TaskStatus.State.KILLED_UNCLEAN ||
+ oldState == TaskStatus.State.SUCCEEDED ||
+ oldState == TaskStatus.State.COMMIT_PENDING)) {
+ return false;
+ }
- //Do not accept any status once the task is marked FAILED/KILLED
- //This is to handle the case of the JobTracker timing out a task
- //due to launch delay, but the TT comes back with any state or
- //TT got expired
- if (oldState == TaskStatus.State.FAILED ||
- oldState == TaskStatus.State.KILLED) {
- tasksToKill.put(taskid, true);
- return false;
- }
+ //Do not accept any status once the task is marked FAILED/KILLED
+ //This is to handle the case of the JobTracker timing out a task
+ //due to launch delay, but the TT comes back with any state or
+ //TT got expired
+ if (oldState == TaskStatus.State.FAILED ||
+ oldState == TaskStatus.State.KILLED) {
+ tasksToKill.put(taskid, true);
+ return false;
+ }
- changed = oldState != newState;
- }
- // if task is a cleanup attempt, do not replace the complete status,
- // update only specific fields.
- // For example, startTime should not be updated,
- // but finishTime has to be updated.
- if (!isCleanupAttempt(taskid)) {
- taskStatuses.put(taskid, status);
- //we don't want to include setup tasks in the task execution stats
- if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) ||
- (!isMapTask() && job.hasSpeculativeReduces()))) {
- long now = JobTracker.getClock().getTime();
- double oldProgRate = getOldProgressRate();
- double currProgRate = getCurrentProgressRate(now);
- job.updateStatistics(oldProgRate, currProgRate, isMapTask());
- //we need to store the current progress rate, so that we can
- //update statistics accurately the next time we invoke
- //updateStatistics
- setProgressRate(currProgRate);
+ changed = oldState != newState;
+ }
+ // if task is a cleanup attempt, do not replace the complete status,
+ // update only specific fields.
+ // For example, startTime should not be updated,
+ // but finishTime has to be updated.
+ if (!isCleanupAttempt(taskid)) {
+ taskStatuses.put(taskid, status);
+ //we don't want to include setup tasks in the task execution stats
+ if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) ||
+ (!isMapTask() && job.hasSpeculativeReduces()))) {
+ long now = JobTracker.getClock().getTime();
+ double oldProgRate = getOldProgressRate();
+ double currProgRate = getCurrentProgressRate(now);
+ job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+ //we need to store the current progress rate, so that we can
+ //update statistics accurately the next time we invoke
+ //updateStatistics
+ setProgressRate(currProgRate);
+ }
+ } else {
+ taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+ status.getProgress(), status.getStateString(), status.getPhase(),
+ status.getFinishTime());
}
- } else {
- taskStatuses.get(taskid).statusUpdate(status.getRunState(),
- status.getProgress(), status.getStateString(), status.getPhase(),
- status.getFinishTime());
- }
- // Recompute progress
- recomputeProgress();
- return changed;
+ // Recompute progress
+ recomputeProgress();
+ return changed;
+ } finally {
+ updateProgressSplits(status);
+ }
}
/**
Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.lang.Integer;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+
+public class AvroArrayUtils {
+
+ private static final Schema ARRAY_INT
+ = Schema.createArray(Schema.create(Schema.Type.INT));
+
+ static public GenericArray<Integer> NULL_PROGRESS_SPLITS_ARRAY
+ = new GenericData.Array<Integer>(0, ARRAY_INT);
+
+ public static GenericArray<Integer>
+ toAvro(int values[]) {
+ GenericData.Array<Integer> result
+ = new GenericData.Array<Integer>(values.length, ARRAY_INT);
+
+ for (int i = 0; i < values.length; ++i) {
+ result.add(values[i]);
+ }
+
+ return result;
+ }
+
+ public static int[] fromAvro(GenericArray<Integer> avro) {
+ int[] result = new int[(int)avro.size()];
+
+ int i = 0;
+
+ for (Iterator<Integer> iter = avro.iterator(); iter.hasNext(); ++i) {
+ result[i] = iter.next();
+ }
+
+ return result;
+ }
+}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Fri Aug 12 21:05:10 2011
@@ -125,7 +125,11 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
- {"name": "counters", "type": "JhCounters"}
+ {"name": "counters", "type": "JhCounters"},
+ {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+ {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+ {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+ {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
]
},
@@ -140,7 +144,11 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
- {"name": "counters", "type": "JhCounters"}
+ {"name": "counters", "type": "JhCounters"},
+ {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+ {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+ {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+ {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
]
},
@@ -176,7 +184,11 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "status", "type": "string"},
- {"name": "error", "type": "string"}
+ {"name": "error", "type": "string"},
+ {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+ {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+ {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+ {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
]
},
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Fri Aug 12 21:05:10 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
import org.apache.avro.util.Utf8;
@@ -48,11 +49,19 @@ public class MapAttemptFinishedEvent im
* @param hostname Name of the host where the map executed
* @param state State string for the attempt
* @param counters Counters for the attempt
+ * @param allSplits the "splits", or a pixelated graph of various
+ * measurable worker node state variables against progress.
+ * Currently there are four; wallclock time, CPU time,
+ * virtual memory and physical memory.
+ *
+ * If you have no splits data, code {@code null} for this
+ * parameter.
*/
- public MapAttemptFinishedEvent(TaskAttemptID id,
- TaskType taskType, String taskStatus,
- long mapFinishTime, long finishTime,
- String hostname, String state, Counters counters) {
+ public MapAttemptFinishedEvent
+ (TaskAttemptID id, TaskType taskType, String taskStatus,
+ long mapFinishTime, long finishTime, String hostname,
+ String state, Counters counters,
+ int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
@@ -62,8 +71,46 @@ public class MapAttemptFinishedEvent im
datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
+
+ datum.clockSplits
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+ datum.cpuUsages
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+ datum.vMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+ datum.physMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+ }
+
+ /**
+ * @deprecated please use the constructor with an additional
+ * argument, an array of splits arrays instead. See
+ * {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+ * for an explanation of the meaning of that parameter.
+ *
+ * Create an event for successful completion of map attempts
+ * @param id Task Attempt ID
+ * @param taskType Type of the task
+ * @param taskStatus Status of the task
+ * @param mapFinishTime Finish time of the map phase
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the map executed
+ * @param state State string for the attempt
+ * @param counters Counters for the attempt
+ */
+ @Deprecated
+ public MapAttemptFinishedEvent
+ (TaskAttemptID id, TaskType taskType, String taskStatus,
+ long mapFinishTime, long finishTime, String hostname,
+ String state, Counters counters) {
+ this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
}
+
MapAttemptFinishedEvent() {}
public Object getDatum() { return datum; }
@@ -97,5 +144,18 @@ public class MapAttemptFinishedEvent im
public EventType getEventType() {
return EventType.MAP_ATTEMPT_FINISHED;
}
+
+ public int[] getClockSplits() {
+ return AvroArrayUtils.fromAvro(datum.clockSplits);
+ }
+ public int[] getCpuUsages() {
+ return AvroArrayUtils.fromAvro(datum.cpuUsages);
+ }
+ public int[] getVMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+ }
+ public int[] getPhysMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Fri Aug 12 21:05:10 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
import org.apache.avro.util.Utf8;
/**
@@ -50,12 +52,16 @@ public class ReduceAttemptFinishedEvent
* @param hostname Name of the host where the attempt executed
* @param state State of the attempt
* @param counters Counters for the attempt
+ * @param allSplits the "splits", or a pixelated graph of various
+ * measurable worker node state variables against progress.
+ * Currently there are four; wallclock time, CPU time,
+ * virtual memory and physical memory.
*/
- public ReduceAttemptFinishedEvent(TaskAttemptID id,
- TaskType taskType, String taskStatus,
- long shuffleFinishTime, long sortFinishTime,
- long finishTime,
- String hostname, String state, Counters counters) {
+ public ReduceAttemptFinishedEvent
+ (TaskAttemptID id, TaskType taskType, String taskStatus,
+ long shuffleFinishTime, long sortFinishTime, long finishTime,
+ String hostname, String state, Counters counters,
+ int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name());
@@ -66,6 +72,45 @@ public class ReduceAttemptFinishedEvent
datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters);
+
+ datum.clockSplits
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+ datum.cpuUsages
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+ datum.vMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+ datum.physMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+ }
+
+ /**
+ * @deprecated please use the constructor with an additional
+ * argument, an array of splits arrays instead. See
+ * {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+ * for an explanation of the meaning of that parameter.
+ *
+ * Create an event to record completion of a reduce attempt
+ * @param id Attempt Id
+ * @param taskType Type of task
+ * @param taskStatus Status of the task
+ * @param shuffleFinishTime Finish time of the shuffle phase
+ * @param sortFinishTime Finish time of the sort phase
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the attempt executed
+ * @param state State of the attempt
+ * @param counters Counters for the attempt
+ */
+ public ReduceAttemptFinishedEvent
+ (TaskAttemptID id, TaskType taskType, String taskStatus,
+ long shuffleFinishTime, long sortFinishTime, long finishTime,
+ String hostname, String state, Counters counters) {
+ this(id, taskType, taskStatus,
+ shuffleFinishTime, sortFinishTime, finishTime,
+ hostname, state, counters, null);
}
ReduceAttemptFinishedEvent() {}
@@ -105,4 +150,17 @@ public class ReduceAttemptFinishedEvent
}
+ public int[] getClockSplits() {
+ return AvroArrayUtils.fromAvro(datum.clockSplits);
+ }
+ public int[] getCpuUsages() {
+ return AvroArrayUtils.fromAvro(datum.cpuUsages);
+ }
+ public int[] getVMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+ }
+ public int[] getPhysMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+ }
+
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Fri Aug 12 21:05:10 2011
@@ -27,6 +27,9 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+import org.apache.hadoop.mapred.TaskStatus;
+
import org.apache.avro.util.Utf8;
/**
@@ -47,11 +50,16 @@ public class TaskAttemptUnsuccessfulComp
* @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed
* @param error Error string
+ * @param allSplits the "splits", or a pixelated graph of various
+ * measurable worker node state variables against progress.
+ * Currently there are four; wallclock time, CPU time,
+ * virtual memory and physical memory.
*/
- public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id,
- TaskType taskType,
- String status, long finishTime,
- String hostname, String error) {
+ public TaskAttemptUnsuccessfulCompletionEvent
+ (TaskAttemptID id, TaskType taskType,
+ String status, long finishTime,
+ String hostname, String error,
+ int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString());
@@ -59,6 +67,40 @@ public class TaskAttemptUnsuccessfulComp
datum.hostname = new Utf8(hostname);
datum.error = new Utf8(error);
datum.status = new Utf8(status);
+
+ datum.clockSplits
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+ datum.cpuUsages
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+ datum.vMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+ datum.physMemKbytes
+ = AvroArrayUtils.toAvro
+ (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+ }
+
+ /**
+ * @deprecated please use the constructor with an additional
+ * argument, an array of splits arrays instead. See
+ * {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+ * for an explanation of the meaning of that parameter.
+ *
+ * Create an event to record the unsuccessful completion of attempts
+ * @param id Attempt ID
+ * @param taskType Type of the task
+ * @param status Status of the attempt
+ * @param finishTime Finish time of the attempt
+ * @param hostname Name of the host where the attempt executed
+ * @param error Error string
+ */
+ public TaskAttemptUnsuccessfulCompletionEvent
+ (TaskAttemptID id, TaskType taskType,
+ String status, long finishTime,
+ String hostname, String error) {
+ this(id, taskType, status, finishTime, hostname, error, null);
}
TaskAttemptUnsuccessfulCompletionEvent() {}
@@ -101,4 +143,19 @@ public class TaskAttemptUnsuccessfulComp
: EventType.REDUCE_ATTEMPT_KILLED);
}
+
+
+ public int[] getClockSplits() {
+ return AvroArrayUtils.fromAvro(datum.clockSplits);
+ }
+ public int[] getCpuUsages() {
+ return AvroArrayUtils.fromAvro(datum.cpuUsages);
+ }
+ public int[] getVMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+ }
+ public int[] getPhysMemKbytes() {
+ return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+ }
+
}
Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Fri Aug 12 21:05:10 2011
@@ -89,6 +89,9 @@ public interface JTConfig extends MRConf
"mapreduce.jobtracker.jobhistory.completed.location";
public static final String JT_JOBHISTORY_LOCATION =
"mapreduce.jobtracker.jobhistory.location";
+ // number of partial task progress reports we retain in job history
+ public static final String JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS =
+ "mapreduce.jobtracker.jobhistory.task.numberprogresssplits";
public static final String JT_AVG_BLACKLIST_THRESHOLD =
"mapreduce.jobtracker.blacklist.average.threshold";
public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
Added: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java?rev=1157253&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java (added)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java Fri Aug 12 21:05:10 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplits {
+ @Test
+ public void testPeriodStatsets() {
+ PeriodicStatsAccumulator cumulative = new CumulativePeriodicStats(8);
+ PeriodicStatsAccumulator status = new StatePeriodicStats(8);
+
+ cumulative.extend(0.0D, 0);
+ cumulative.extend(0.4375D, 700); // 200 per octant
+ cumulative.extend(0.5625D, 1100); // 0.5 = 900
+ cumulative.extend(0.625D, 1300);
+ cumulative.extend(1.0D, 7901);
+
+ int total = 0;
+ int[] results = cumulative.getValues();
+
+ for (int i = 0; i < 8; ++i) {
+ System.err.println("segment i = " + results[i]);
+ }
+
+ assertEquals("Bad interpolation in cumulative segment 0", 200, results[0]);
+ assertEquals("Bad interpolation in cumulative segment 1", 200, results[1]);
+ assertEquals("Bad interpolation in cumulative segment 2", 200, results[2]);
+ assertEquals("Bad interpolation in cumulative segment 3", 300, results[3]);
+ assertEquals("Bad interpolation in cumulative segment 4", 400, results[4]);
+ assertEquals("Bad interpolation in cumulative segment 5", 2200, results[5]);
+ // these are rounded down
+ assertEquals("Bad interpolation in cumulative segment 6", 2200, results[6]);
+ assertEquals("Bad interpolation in cumulative segment 7", 2201, results[7]);
+
+ status.extend(0.0D, 0);
+ status.extend(1.0D/16.0D, 300); // + 75 for bucket 0
+ status.extend(3.0D/16.0D, 700); // + 200 for 0, +300 for 1
+ status.extend(7.0D/16.0D, 2300); // + 450 for 1, + 1500 for 2, + 1050 for 3
+ status.extend(1.0D, 1400); // +1125 for 3, +2100 for 4, +1900 for 5,
+ ; // +1700 for 6, +1500 for 7
+
+ results = status.getValues();
+
+ assertEquals("Bad interpolation in status segment 0", 275, results[0]);
+ assertEquals("Bad interpolation in status segment 1", 750, results[1]);
+ assertEquals("Bad interpolation in status segment 2", 1500, results[2]);
+ assertEquals("Bad interpolation in status segment 3", 2175, results[3]);
+ assertEquals("Bad interpolation in status segment 4", 2100, results[4]);
+ assertEquals("Bad interpolation in status segment 5", 1900, results[5]);
+ assertEquals("Bad interpolation in status segment 6", 1700, results[6]);
+ assertEquals("Bad interpolation in status segment 7", 1500, results[7]);
+ }
+}
Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java (original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java Fri Aug 12 21:05:10 2011
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapreduce.jobhistory;
+import java.util.List;
+import java.util.ArrayList;
+
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -28,6 +31,15 @@ import junit.framework.TestCase;
* Test various jobhistory events
*/
public class TestJobHistoryEvents extends TestCase {
+ static final int[][] NULL_SPLITS_ARRAY
+ = new int[org.apache.hadoop.tools.rumen.LoggedTaskAttempt.SplitVectorKind.values().length][];
+
+ static {
+ for (int i = 0; i < NULL_SPLITS_ARRAY.length; ++i) {
+ NULL_SPLITS_ARRAY[i] = new int[0];
+ }
+ }
+
/**
* Test {@link TaskAttemptStartedEvent} for various task types.
*/
@@ -73,7 +85,8 @@ public class TestJobHistoryEvents extend
String state) {
for (TaskType t : types) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
- new TaskAttemptUnsuccessfulCompletionEvent(id, t, state, 0L, "", "");
+ new TaskAttemptUnsuccessfulCompletionEvent
+ (id, t, state, 0L, "", "", NULL_SPLITS_ARRAY);
assertEquals(expected, tauce.getEventType());
}
}
Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Aug 12 21:05:10 2011
@@ -852,6 +852,30 @@ public class TestRumenJobTraces {
public void testTopologyBuilder() throws Exception {
final TopologyBuilder subject = new TopologyBuilder();
+ // This 4 comes from
+ // TaskInProgress.ProgressibleSplitsBlock.burst().size , which
+ // is invisible here.
+
+ int[][] splits = new int[4][];
+
+ splits[0] = new int[12];
+ splits[1] = new int[12];
+ splits[2] = new int[12];
+ splits[3] = new int[12];
+
+ for (int j = 0; j < 4; ++j) {
+ for (int i = 0; i < 12; ++i) {
+ splits[j][i] = -1;
+ }
+ }
+
+ for (int i = 0; i < 6; ++i) {
+ splits[0][i] = 500000 * i;
+ splits[1][i] = 300000 * i;
+ splits[2][i] = 500000;
+ splits[3][i] = 700000;
+ }
+
// currently we extract no host names from the Properties
subject.process(new Properties());
@@ -860,16 +884,16 @@ public class TestRumenJobTraces {
.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
"SUCCESS", null));
- subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
- .forName("attempt_200904211745_0003_m_000004_1"), TaskType
- .valueOf("MAP"), "STATUS", 1234567890L,
- "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
- "MACHINE_EXPLODED"));
- subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
- .forName("attempt_200904211745_0003_m_000004_2"), TaskType
- .valueOf("MAP"), "STATUS", 1234567890L,
- "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
- "MACHINE_EXPLODED"));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+ (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
+ TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED", splits));
+ subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+ (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
+ TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+ "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+ "MACHINE_EXPLODED", splits));
subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"),
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Aug 12 21:05:10 2011
@@ -476,6 +476,11 @@ public class JobBuilder {
}
attempt.setFinishTime(event.getFinishTime());
+
+ attempt.arraySetClockSplits(event.getClockSplits());
+ attempt.arraySetCpuUsages(event.getCpuUsages());
+ attempt.arraySetVMemKbytes(event.getVMemKbytes());
+ attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
}
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -521,6 +526,10 @@ public class JobBuilder {
attempt.setSortFinished(event.getSortFinishTime());
attempt
.incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters);
+ attempt.arraySetClockSplits(event.getClockSplits());
+ attempt.arraySetCpuUsages(event.getCpuUsages());
+ attempt.arraySetVMemKbytes(event.getVMemKbytes());
+ attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
}
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -537,7 +546,11 @@ public class JobBuilder {
// is redundant, but making this will add future-proofing.
attempt.setFinishTime(event.getFinishTime());
attempt
- .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+ .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+ attempt.arraySetClockSplits(event.getClockSplits());
+ attempt.arraySetCpuUsages(event.getCpuUsages());
+ attempt.arraySetVMemKbytes(event.getVMemKbytes());
+ attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
}
private void processJobUnsuccessfulCompletionEvent(
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Aug 12 21:05:10 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.tools.rumen;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -71,10 +73,118 @@ public class LoggedTaskAttempt implement
// Initialize to default object for backward compatibility
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+ List<Integer> clockSplits = new ArrayList<Integer>();
+ List<Integer> cpuUsages = new ArrayList<Integer>();
+ List<Integer> vMemKbytes = new ArrayList<Integer>();
+ List<Integer> physMemKbytes = new ArrayList<Integer>();
+
LoggedTaskAttempt() {
super();
}
+ // carries the kinds of splits vectors a LoggedTaskAttempt holds.
+ //
+ // Each enumeral has the following methods:
+ // get(LoggedTaskAttempt attempt)
+ // returns a List<Integer> with the corresponding value field
+ // set(LoggedTaskAttempt attempt, List<Integer> newValue)
+ // sets the value
+ // There is also a pair of methods get(List<List<Integer>>) and
+ // set(List<List<Integer>>, List<Integer>) which correspondingly
+ // delivers or sets the appropriate element of the
+ // List<List<Integer>> .
+ // This makes it easier to add another kind in the future.
+ public enum SplitVectorKind {
+
+ WALLCLOCK_TIME {
+ @Override
+ public List<Integer> get(LoggedTaskAttempt attempt) {
+ return attempt.getClockSplits();
+ }
+ @Override
+ public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+ attempt.setClockSplits(newValue);
+ }
+ },
+
+ CPU_USAGE {
+ @Override
+ public List<Integer> get(LoggedTaskAttempt attempt) {
+ return attempt.getCpuUsages();
+ }
+ @Override
+ public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+ attempt.setCpuUsages(newValue);
+ }
+ },
+
+ VIRTUAL_MEMORY_KBYTES {
+ @Override
+ public List<Integer> get(LoggedTaskAttempt attempt) {
+ return attempt.getVMemKbytes();
+ }
+ @Override
+ public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+ attempt.setVMemKbytes(newValue);
+ }
+ },
+
+ PHYSICAL_MEMORY_KBYTES {
+ @Override
+ public List<Integer> get(LoggedTaskAttempt attempt) {
+ return attempt.getPhysMemKbytes();
+ }
+ @Override
+ public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+ attempt.setPhysMemKbytes(newValue);
+ }
+ };
+
+ static private final List<List<Integer>> NULL_SPLITS_VECTOR
+ = new ArrayList<List<Integer>>();
+
+ static {
+ for (SplitVectorKind kind : SplitVectorKind.values() ) {
+ NULL_SPLITS_VECTOR.add(new ArrayList<Integer>());
+ }
+ }
+
+ abstract public List<Integer> get(LoggedTaskAttempt attempt);
+
+ abstract public void set(LoggedTaskAttempt attempt, List<Integer> newValue);
+
+ public List<Integer> get(List<List<Integer>> listSplits) {
+ return listSplits.get(this.ordinal());
+ }
+
+ public void set(List<List<Integer>> listSplits, List<Integer> newValue) {
+ listSplits.set(this.ordinal(), newValue);
+ }
+
+ static public List<List<Integer>> getNullSplitsVector() {
+ return NULL_SPLITS_VECTOR;
+ }
+ }
+
+ /**
+ *
+ * @returns a list of all splits vectors, ordered in enumeral order
+ * within {@link SplitVectorKind} . Do NOT use hard-coded
+ * indices within the return for this with a hard-coded
+ * index to get individual values; use
+ * {@code SplitVectorKind.get(LoggedTaskAttempt)} instead.
+ */
+ public List<List<Integer>> allSplitVectors() {
+ List<List<Integer>> result
+ = new ArrayList<List<Integer>>(SplitVectorKind.values().length);
+
+ for (SplitVectorKind kind : SplitVectorKind.values() ) {
+ result.add(kind.get(this));
+ }
+
+ return result;
+ }
+
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
@@ -89,6 +199,78 @@ public class LoggedTaskAttempt implement
}
}
+ public List<Integer> getClockSplits() {
+ return clockSplits;
+ }
+
+ void setClockSplits(List<Integer> clockSplits) {
+ this.clockSplits = clockSplits;
+ }
+
+ void arraySetClockSplits(int[] clockSplits) {
+ List<Integer> result = new ArrayList<Integer>();
+
+ for (int i = 0; i < clockSplits.length; ++i) {
+ result.add(clockSplits[i]);
+ }
+
+ this.clockSplits = result;
+ }
+
+ public List<Integer> getCpuUsages() {
+ return cpuUsages;
+ }
+
+ void setCpuUsages(List<Integer> cpuUsages) {
+ this.cpuUsages = cpuUsages;
+ }
+
+ void arraySetCpuUsages(int[] cpuUsages) {
+ List<Integer> result = new ArrayList<Integer>();
+
+ for (int i = 0; i < cpuUsages.length; ++i) {
+ result.add(cpuUsages[i]);
+ }
+
+ this.cpuUsages = result;
+ }
+
+ public List<Integer> getVMemKbytes() {
+ return vMemKbytes;
+ }
+
+ void setVMemKbytes(List<Integer> vMemKbytes) {
+ this.vMemKbytes = vMemKbytes;
+ }
+
+ void arraySetVMemKbytes(int[] vMemKbytes) {
+ List<Integer> result = new ArrayList<Integer>();
+
+ for (int i = 0; i < vMemKbytes.length; ++i) {
+ result.add(vMemKbytes[i]);
+ }
+
+ this.vMemKbytes = result;
+ }
+
+ public List<Integer> getPhysMemKbytes() {
+ return physMemKbytes;
+ }
+
+ void setPhysMemKbytes(List<Integer> physMemKbytes) {
+ this.physMemKbytes = physMemKbytes;
+ }
+
+ void arraySetPhysMemKbytes(int[] physMemKbytes) {
+ List<Integer> result = new ArrayList<Integer>();
+
+ for (int i = 0; i < physMemKbytes.length; ++i) {
+ result.add(physMemKbytes[i]);
+ }
+
+ this.physMemKbytes = result;
+ }
+
void adjustTimes(long adjustment) {
startTime += adjustment;
finishTime += adjustment;
@@ -480,6 +662,26 @@ public class LoggedTaskAttempt implement
c1.deepCompare(c2, recurse);
}
+ private void compare1(List<Integer> c1, List<Integer> c2, TreePath loc,
+ String eltname)
+ throws DeepInequalityException {
+ if (c1 == null && c2 == null) {
+ return;
+ }
+
+ if (c1 == null || c2 == null || c1.size() != c2.size()) {
+ throw new DeepInequalityException
+ (eltname + " miscompared", new TreePath(loc, eltname));
+ }
+
+ for (int i = 0; i < c1.size(); ++i) {
+ if (!c1.get(i).equals(c2.get(i))) {
+ throw new DeepInequalityException("" + c1.get(i) + " != " + c2.get(i),
+ new TreePath(loc, eltname, i));
+ }
+ }
+ }
+
public void deepCompare(DeepCompare comparand, TreePath loc)
throws DeepInequalityException {
if (!(comparand instanceof LoggedTaskAttempt)) {
@@ -518,5 +720,10 @@ public class LoggedTaskAttempt implement
compare1(sortFinished, other.sortFinished, loc, "sortFinished");
compare1(location, other.location, loc, "location");
+
+ compare1(clockSplits, other.clockSplits, loc, "clockSplits");
+ compare1(cpuUsages, other.cpuUsages, loc, "cpuUsages");
+ compare1(vMemKbytes, other.vMemKbytes, loc, "vMemKbytes");
+ compare1(physMemKbytes, other.physMemKbytes, loc, "physMemKbytes");
}
}
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Fri Aug 12 21:05:10 2011
@@ -68,10 +68,13 @@ public class MapAttempt20LineHistoryEven
(MapAttempt20LineHistoryEventEmitter) thatg;
if (finishTime != null && "success".equalsIgnoreCase(status)) {
- return new MapAttemptFinishedEvent(taskAttemptID,
- that.originalTaskType, status, Long.parseLong(finishTime), Long
- .parseLong(finishTime), hostName, state,
- maybeParseCounters(counters));
+ return new MapAttemptFinishedEvent
+ (taskAttemptID,
+ that.originalTaskType, status,
+ Long.parseLong(finishTime),
+ Long.parseLong(finishTime),
+ hostName, state, maybeParseCounters(counters),
+ null);
}
}
@@ -88,5 +91,4 @@ public class MapAttempt20LineHistoryEven
List<SingleEventEmitter> nonFinalSEEs() {
return nonFinals;
}
-
}
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Fri Aug 12 21:05:10 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.List;
+
import org.apache.hadoop.mapred.TaskStatus.State;
/**
@@ -26,11 +28,33 @@ import org.apache.hadoop.mapred.TaskStat
public class MapTaskAttemptInfo extends TaskAttemptInfo {
private long runtime;
- public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {
- super(state, taskInfo);
+ public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+ long runtime, List<List<Integer>> allSplits) {
+ super(state, taskInfo,
+ allSplits == null
+ ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+ : allSplits);
this.runtime = runtime;
}
+ /**
+ *
+ * @deprecated please use the constructor with
+ * {@code (state, taskInfo, runtime,
+ * List<List<Integer>> allSplits)}
+ * instead.
+ *
+ * see {@link LoggedTaskAttempt} for an explanation of
+ * {@code allSplits}.
+ *
+ * If there are no known splits, use {@code null}.
+ */
+ @Deprecated
+ public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+ long runtime) {
+ this(state, taskInfo, runtime, null);
+ }
+
@Override
public long getRuntime() {
return getMapRuntime();
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Fri Aug 12 21:05:10 2011
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
-public class ReduceAttempt20LineHistoryEventEmitter extends
- TaskAttempt20LineEventEmitter {
+public class ReduceAttempt20LineHistoryEventEmitter
+ extends TaskAttempt20LineEventEmitter {
static List<SingleEventEmitter> nonFinals =
new LinkedList<SingleEventEmitter>();
@@ -71,10 +71,15 @@ public class ReduceAttempt20LineHistoryE
ReduceAttempt20LineHistoryEventEmitter that =
(ReduceAttempt20LineHistoryEventEmitter) thatg;
- return new ReduceAttemptFinishedEvent(taskAttemptID,
- that.originalTaskType, status, Long.parseLong(shuffleFinish),
- Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName,
- state, maybeParseCounters(counters));
+ return new ReduceAttemptFinishedEvent
+ (taskAttemptID,
+ that.originalTaskType, status,
+ Long.parseLong(shuffleFinish),
+ Long.parseLong(sortFinish),
+ Long.parseLong(finishTime),
+ hostName,
+ state, maybeParseCounters(counters),
+ null);
}
}
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Fri Aug 12 21:05:10 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.List;
+
import org.apache.hadoop.mapred.TaskStatus.State;
/**
@@ -29,13 +31,35 @@ public class ReduceTaskAttemptInfo exten
private long reduceTime;
public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
- long mergeTime, long reduceTime) {
- super(state, taskInfo);
+ long mergeTime, long reduceTime, List<List<Integer>> allSplits) {
+ super(state, taskInfo,
+ allSplits == null
+ ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+ : allSplits);
this.shuffleTime = shuffleTime;
this.mergeTime = mergeTime;
this.reduceTime = reduceTime;
}
+
+ /**
+ *
+ * @deprecated please use the constructor with
+ * {@code (state, taskInfo, shuffleTime, mergeTime, reduceTime
+ * List<List<Integer>> allSplits)}
+ * instead.
+ *
+ * see {@link LoggedTaskAttempt} for an explanation of
+ * {@code allSplits}.
+ *
+ * If there are no known splits, use {@code null}.
+ */
+ @Deprecated
+ public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+ long mergeTime, long reduceTime) {
+ this(state, taskInfo, shuffleTime, mergeTime, reduceTime, null);
+ }
+
/**
* Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
*
@@ -67,5 +91,4 @@ public class ReduceTaskAttemptInfo exten
public long getRuntime() {
return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
}
-
}
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Fri Aug 12 21:05:10 2011
@@ -138,9 +138,10 @@ public abstract class TaskAttempt20LineE
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
- return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
- that.originalTaskType, status, Long.parseLong(finishTime),
- hostName, error);
+ return new TaskAttemptUnsuccessfulCompletionEvent
+ (taskAttemptID,
+ that.originalTaskType, status, Long.parseLong(finishTime),
+ hostName, error, null);
}
return null;
Modified: hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=1157253&r1=1157252&r2=1157253&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (original)
+++ hadoop/common/trunk/mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Fri Aug 12 21:05:10 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.List;
+
import org.apache.hadoop.mapred.TaskStatus.State;
/**
@@ -27,13 +29,22 @@ public abstract class TaskAttemptInfo {
protected final State state;
protected final TaskInfo taskInfo;
- protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+ protected final List<List<Integer>> allSplits;
+
+ protected TaskAttemptInfo
+ (State state, TaskInfo taskInfo, List<List<Integer>> allSplits) {
if (state == State.SUCCEEDED || state == State.FAILED) {
this.state = state;
} else {
throw new IllegalArgumentException("status cannot be " + state);
}
this.taskInfo = taskInfo;
+ this.allSplits = allSplits;
+ }
+
+ protected TaskAttemptInfo
+ (State state, TaskInfo taskInfo) {
+ this(state, taskInfo, LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector());
}
/**
@@ -60,4 +71,8 @@ public abstract class TaskAttemptInfo {
public TaskInfo getTaskInfo() {
return taskInfo;
}
+
+ public List<Integer> getSplitVector(LoggedTaskAttempt.SplitVectorKind kind) {
+ return kind.get(allSplits);
+ }
}
|