Author: omalley
Date: Tue Mar 8 05:53:36 2011
New Revision: 1079189
URL: http://svn.apache.org/viewvc?rev=1079189&view=rev
Log:
commit 68c2c71c91fc1bd1526d6c29e367b54e3c9a241f
Author: Richard King <dking@yahoo-inc.com>
Date: Wed Nov 24 18:18:28 2010 +0000
These are the added files for my ill-fated attempt to add MR2037.
Added:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ * measurements where the raw data are a measurement of an
+ * accumulation. The result in each bucket is the estimate
+ * of the progress-weighted change in that quantity over the
+ * progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ * a distance traveled. It makes sense to consider that portion of
+ * the total travel that can be apportioned to each bucket.
+ *
+ */
+class CumulativePeriodicStats extends PeriodicStatsAccumulator {
+ // int's are acceptable here, even though times are normally
+ // long's, because these are a difference and an int won't
+ // overflow for 24 days. Tasks can't run for more than about a
+ // week for other reasons, and most jobs would be written
+ int previousValue = 0;
+
+ CumulativePeriodicStats(int count) {
+ super(count);
+ }
+
+ /**
+ *
+ * accumulates a new reading by keeping a running account of the
+ * value distance from the beginning of the bucket to the end of
+ * this reading
+ */
+ @Override
+ protected void extendInternal(double newProgress, int newValue) {
+ if (state == null) {
+ return;
+ }
+
+ state.currentAccumulation += (double)(newValue - previousValue);
+ previousValue = newValue;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This abstract class that represents a bucketed series of
+ * measurements of a quantity being measured in a running task
+ * attempt.
+ *
+ * <p>The sole constructor is called with a count, which is the
+ * number of buckets into which we evenly divide the spectrum of
+ * progress from 0.0D to 1.0D . In the future we may provide for
+ * custom split points that don't have to be uniform.
+ *
+ * <p>A subclass determines how we fold readings for portions of a
+ * bucket and how we interpret the readings by overriding
+ * {@code extendInternal(...)} and {@code initializeInterval()}
+ */
+public abstract class PeriodicStatsAccumulator {
+ // The range of progress from 0.0D through 1.0D is divided into
+ // count "progress segments". This object accumulates an
+ // estimate of the effective value of a time-varying value during
+ // the zero-based i'th progress segment, ranging from i/count
+ // through (i+1)/count .
+ // This is an abstract class. We have two implementations: one
+ // for monotonically increasing time-dependent variables
+ // [currently, CPU time in milliseconds and wallclock time in
+ // milliseconds] and one for quantities that can vary arbitrarily
+ // over time, currently virtual and physical memory used, in
+ // kilobytes.
+ // We carry int's here. This saves a lot of JVM heap space in the
+ // job tracker per running task attempt [200 bytes per] but it
+ // has a small downside.
+ // No task attempt can run for more than 57 days nor occupy more
+ // than two terabytes of virtual memory.
+ protected final int count;
+ protected final int[] values;
+
+ static class StatsetState {
+ int oldValue = 0;
+ double oldProgress = 0.0D;
+
+ double currentAccumulation = 0.0D;
+ }
+
+ // We provide this level of indirection to reduce the memory
+ // footprint of done task attempts. When a task's progress
+ // reaches 1.0D, we delete this objecte StatsetState.
+ StatsetState state = new StatsetState();
+
+ PeriodicStatsAccumulator(int count) {
+ this.count = count;
+ this.values = new int[count];
+ for (int i = 0; i < count; ++i) {
+ values[i] = -1;
+ }
+ }
+
+ protected int[] getValues() {
+ return values;
+ }
+
+ // The concrete implementation of this abstract function
+ // accumulates more data into the current progress segment.
+ // newProgress [from the call] and oldProgress [from the object]
+ // must be in [or at the border of] a single progress segment.
+ /**
+ *
+ * adds a new reading to the current bucket.
+ *
+ * @param newProgress the endpoint of the interval this new
+ * reading covers
+ * @param newValue the value of the reading at {@code newProgress}
+ *
+ * The class has three instance variables, {@code oldProgress} and
+ * {@code oldValue} and {@code currentAccumulation}.
+ *
+ * {@code extendInternal} can count on three things:
+ *
+ * 1: The first time it's called in a particular instance, both
+ * oldXXX's will be zero.
+ *
+ * 2: oldXXX for a later call is the value of newXXX of the
+ * previous call. This ensures continuity in accumulation from
+ * one call to the next.
+ *
+ * 3: {@code currentAccumulation} is owned by
+ * {@code initializeInterval} and {@code extendInternal}.
+ */
+ protected abstract void extendInternal(double newProgress, int newValue);
+
+ // What has to be done when you open a new interval
+ /**
+ * initializes the state variables to be ready for a new interval
+ */
+ protected void initializeInterval() {
+ state.currentAccumulation = 0.0D;
+ }
+
+ // called for each new reading
+ /**
+ * This method calls {@code extendInternal} at least once. It
+ * divides the current progress interval [from the last call's
+ * {@code newProgress} to this call's {@code newProgress} ]
+ * into one or more subintervals by splitting at any point which
+ * is an interval boundary if there are any such points. It
+ * then calls {@code extendInternal} for each subinterval, or the
+ * whole interval if there are no splitting points.
+ *
+ * <p>For example, if the value was {@code 300} last time with
+ * {@code 0.3} progress, and count is {@code 5}, and you get a
+ * new reading with the variable at {@code 700} and progress at
+ * {@code 0.7}, you get three calls to {@code extendInternal}:
+ * one extending from progress {@code 0.3} to {@code 0.4} [the
+ * next boundary] with a value of {@code 400}, the next one
+ * through {@code 0.6} with a value of {@code 600}, and finally
+ * one at {@code 700} with a progress of {@code 0.7} .
+ *
+ * @param newProgress the endpoint of the progress range this new
+ * reading covers
+ * @param newValue the value of the reading at {@code newProgress}
+ */
+ protected void extend(double newProgress, int newValue) {
+ if (state == null || newProgress < state.oldProgress) {
+ return;
+ }
+
+ // This correctness of this code depends on 100% * count = count.
+ int oldIndex = (int)(state.oldProgress * count);
+ int newIndex = (int)(newProgress * count);
+ int originalOldValue = state.oldValue;
+
+ double fullValueDistance = (double)newValue - state.oldValue;
+ double fullProgressDistance = newProgress - state.oldProgress;
+ double originalOldProgress = state.oldProgress;
+
+ // In this loop we detect each subinterval boundary within the
+ // range from the old progress to the new one. Then we
+ // interpolate the value from the old value to the new one to
+ // infer what its value might have been at each such boundary.
+ // Lastly we make the necessary calls to extendInternal to fold
+ // in the data for each trapazoid where no such trapazoid
+ // crosses a boundary.
+ for (int closee = oldIndex; closee < newIndex; ++closee) {
+ double interpolationProgress = (double)(closee + 1) / count;
+ // In floats, x * y / y might not equal y.
+ interpolationProgress = Math.min(interpolationProgress, newProgress);
+
+ double progressLength = (interpolationProgress - originalOldProgress);
+ double interpolationProportion = progressLength / fullProgressDistance;
+
+ double interpolationValueDistance
+ = fullValueDistance * interpolationProportion;
+
+ // estimates the value at the next [interpolated] subsegment boundary
+ int interpolationValue
+ = (int)interpolationValueDistance + originalOldValue;
+
+ extendInternal(interpolationProgress, interpolationValue);
+
+ advanceState(interpolationProgress, interpolationValue);
+
+ values[closee] = (int)state.currentAccumulation;
+ initializeInterval();
+
+ }
+
+ extendInternal(newProgress, newValue);
+ advanceState(newProgress, newValue);
+
+ if (newIndex == count) {
+ state = null;
+ }
+ }
+
+ protected void advanceState(double newProgress, int newValue) {
+ state.oldValue = newValue;
+ state.oldProgress = newProgress;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ int get(int index) {
+ return values[index];
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+/*
+ * This object gathers the [currently four] PeriodStatset's that we
+ * are gathering for a particular task attempt for packaging and
+ * handling as a single object.
+ */
+public class ProgressSplitsBlock {
+ final PeriodicStatsAccumulator progressWallclockTime;
+ final PeriodicStatsAccumulator progressCPUTime;
+ final PeriodicStatsAccumulator progressVirtualMemoryKbytes;
+ final PeriodicStatsAccumulator progressPhysicalMemoryKbytes;
+
+ static final int[] NULL_ARRAY = new int[0];
+
+ static final int WALLCLOCK_TIME_INDEX = 0;
+ static final int CPU_TIME_INDEX = 1;
+ static final int VIRTUAL_MEMORY_KBYTES_INDEX = 2;
+ static final int PHYSICAL_MEMORY_KBYTES_INDEX = 3;
+
+ static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
+
+ ProgressSplitsBlock(int numberSplits) {
+ progressWallclockTime
+ = new CumulativePeriodicStats(numberSplits);
+ progressCPUTime
+ = new CumulativePeriodicStats(numberSplits);
+ progressVirtualMemoryKbytes
+ = new StatePeriodicStats(numberSplits);
+ progressPhysicalMemoryKbytes
+ = new StatePeriodicStats(numberSplits);
+ }
+
+ // this coordinates with LoggedTaskAttempt.SplitVectorKind
+ int[][] burst() {
+ int[][] result = new int[4][];
+
+ result[WALLCLOCK_TIME_INDEX] = progressWallclockTime.getValues();
+ result[CPU_TIME_INDEX] = progressCPUTime.getValues();
+ result[VIRTUAL_MEMORY_KBYTES_INDEX] = progressVirtualMemoryKbytes.getValues();
+ result[PHYSICAL_MEMORY_KBYTES_INDEX] = progressPhysicalMemoryKbytes.getValues();
+
+ return result;
+ }
+
+ static public int[] arrayGet(int[][] burstedBlock, int index) {
+ return burstedBlock == null ? NULL_ARRAY : burstedBlock[index];
+ }
+
+ static public int[] arrayGetWallclockTime(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, WALLCLOCK_TIME_INDEX);
+ }
+
+ static public int[] arrayGetCPUTime(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, CPU_TIME_INDEX);
+ }
+
+ static public int[] arrayGetVMemKbytes(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, VIRTUAL_MEMORY_KBYTES_INDEX);
+ }
+
+ static public int[] arrayGetPhysMemKbytes(int[][] burstedBlock) {
+ return arrayGet(burstedBlock, PHYSICAL_MEMORY_KBYTES_INDEX);
+ }
+}
+
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ * measurements where the raw data are a measurement of a
+ * time-varying quantity. The result in each bucket is the estimate
+ * of the progress-weighted mean value of that quantity over the
+ * progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ * a temperature. It makes sense to consider the mean temperature
+ * over a progress range.
+ *
+ */
+class StatePeriodicStats extends PeriodicStatsAccumulator {
+ StatePeriodicStats(int count) {
+ super(count);
+ }
+
+ /**
+ *
+ * accumulates a new reading by keeping a running account of the
+ * area under the piecewise linear curve marked by pairs of
+ * {@code newProgress, newValue} .
+ */
+ @Override
+ protected void extendInternal(double newProgress, int newValue) {
+ if (state == null) {
+ return;
+ }
+
+ // the effective height of this trapezoid if rectangularized
+ double mean = ((double)newValue + (double)state.oldValue)/2.0D;
+
+ // conceptually mean * (newProgress - state.oldProgress) / (1 / count)
+ state.currentAccumulation += mean * (newProgress - state.oldProgress) * count;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.lang.Integer;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+
+public class AvroArrayUtils {
+
+ private static final Schema ARRAY_INT
+ = Schema.createArray(Schema.create(Schema.Type.INT));
+
+ static public GenericArray<Integer> NULL_PROGRESS_SPLITS_ARRAY
+ = new GenericData.Array<Integer>(0, ARRAY_INT);
+
+ public static GenericArray<Integer>
+ toAvro(int values[]) {
+ GenericData.Array<Integer> result
+ = new GenericData.Array<Integer>(values.length, ARRAY_INT);
+
+ for (int i = 0; i < values.length; ++i) {
+ result.add(values[i]);
+ }
+
+ return result;
+ }
+
+ public static int[] fromAvro(GenericArray<Integer> avro) {
+ int[] result = new int[(int)avro.size()];
+
+ int i = 0;
+
+ for (Iterator<Integer> iter = avro.iterator(); iter.hasNext(); ++i) {
+ result[i] = iter.next();
+ }
+
+ return result;
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
+
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever;
+
+import org.apache.hadoop.tools.rumen.TraceBuilder;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.tools.rumen.ZombieJob;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.tools.rumen.LoggedTask;
+import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ZombieCluster;
+import org.apache.hadoop.tools.rumen.MachineNode;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplitTranscription {
+ // This testcase runs a job in a mini cluster, and then it verifies
+ // that splits are stored in the resulting trace, and also
+ // retrievable from the ZombieJob resulting from reading the trace.
+ //
+ // We can't test for any particular values, unfortunately.
+ @Test
+ public void testTranscription() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ final Path tempDir = new Path(rootTempDir, "testTranscription");
+ lfs.delete(tempDir, true);
+
+ // Run a MR job
+ // create a MR cluster
+ conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+ conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
+ final MiniMRCluster mrCluster
+ = new MiniMRCluster(1, "file:///", 1, null, null,
+ new JobConf(conf));
+
+ final JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker();
+ final JobHistory history = tracker.getJobHistory();
+
+ // run a job
+ Path inDir = new Path(tempDir, "input");
+ Path outDir = new Path(tempDir, "output");
+
+ ZombieJobProducer story = null;
+
+ boolean success = false;
+
+ try {
+ JobConf jConf = mrCluster.createJobConf();
+ // construct a job with 1 map and 1 reduce task.
+ Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1);
+ // disable setup/cleanup
+ job.setJobSetupCleanupNeeded(false);
+ // set the output format to take care of the _temporary folder
+ job.setOutputFormatClass(MyOutputFormat.class);
+ // wait for the job to complete
+ job.waitForCompletion(false);
+
+ assertTrue("Job failed", job.isSuccessful());
+
+ JobID id = job.getJobID();
+
+ Path inputPath = null;
+ // wait for 10 secs for the jobhistory file to move into the done folder
+ for (int i = 0; i < 100; ++i) {
+ JobHistoryRecordRetriever retriever
+ = history.getMatchingJobs(null, "", null, id.toString());
+ if (retriever.hasNext()) {
+ inputPath = retriever.next().getPath();
+ System.out.println("History file = " + inputPath);
+ break;
+ }
+ Thread.currentThread().sleep(100);
+ }
+
+ assertTrue("Missing job history file", lfs.exists(inputPath));
+
+ System.out.println("testTranscription() input history file is "
+ + inputPath.toString());
+
+ final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+ final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+ System.err.println("testTranscription() output .json file is "
+ + tracePath.toString());
+
+ String[] args =
+ { tracePath.toString(), topologyPath.toString(), inputPath.toString() };
+
+ Tool analyzer = new TraceBuilder();
+ int result = ToolRunner.run(analyzer, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ MachineNode.Builder builder = new MachineNode.Builder("node.megacorp.com", 0);
+ MachineNode node = builder.build();
+ ZombieCluster cluster = new ZombieCluster(topologyPath, node, jConf);
+
+ story = new ZombieJobProducer(tracePath, cluster, jConf);
+
+ // test that the logged* has everything down to the split vector
+
+ ZombieJob theZombieJob = story.getNextJob();
+ LoggedJob theJob = theZombieJob.getLoggedJob();
+ LoggedTask firstMapTask = theJob.getMapTasks().get(0);
+ LoggedTaskAttempt firstAttempt = firstMapTask.getAttempts().get(0);
+
+ assertTrue("No clock splits were stored",
+ firstAttempt.getClockSplits().size() > 0);
+
+ TaskAttemptInfo attemptInfo
+ = theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
+
+ assertEquals("Can't retrieve clock splits from the LoggedTaskAttempt",
+ attemptInfo.getSplitVector
+ (LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME)
+ .size(),
+ ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
+
+ // test that the ZombieJob can deliver splits vectors
+
+ TaskAttemptInfo tinfo = theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
+ List<Integer> splitVector
+ = tinfo.getSplitVector(LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME);
+
+ assertEquals("Can't retrieve clock splits from the ZombieJob",
+ splitVector.size(), ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
+
+ success = true;
+ } finally {
+ if (success) {
+ lfs.delete(tempDir, true);
+ }
+
+ if (story != null) {
+ story.close();
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
Tue Mar 8 05:53:36 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplits {
+ @Test
+ public void testPeriodStatsets() {
+ PeriodicStatsAccumulator cumulative = new CumulativePeriodicStats(8);
+ PeriodicStatsAccumulator status = new StatePeriodicStats(8);
+
+ cumulative.extend(0.0D, 0);
+ cumulative.extend(0.4375D, 700); // 200 per octant
+ cumulative.extend(0.5625D, 1100); // 0.5 = 900
+ cumulative.extend(0.625D, 1300);
+ cumulative.extend(1.0D, 7901);
+
+ int total = 0;
+ int[] results = cumulative.getValues();
+
+ for (int i = 0; i < 8; ++i) {
+ System.err.println("segment i = " + results[i]);
+ }
+
+ assertEquals("Bad interpolation in cumulative segment 0", 200, results[0]);
+ assertEquals("Bad interpolation in cumulative segment 1", 200, results[1]);
+ assertEquals("Bad interpolation in cumulative segment 2", 200, results[2]);
+ assertEquals("Bad interpolation in cumulative segment 3", 300, results[3]);
+ assertEquals("Bad interpolation in cumulative segment 4", 400, results[4]);
+ assertEquals("Bad interpolation in cumulative segment 5", 2200, results[5]);
+ // these are rounded down
+ assertEquals("Bad interpolation in cumulative segment 6", 2200, results[6]);
+ assertEquals("Bad interpolation in cumulative segment 7", 2201, results[7]);
+
+ status.extend(0.0D, 0);
+ status.extend(1.0D/16.0D, 300); // + 75 for bucket 0
+ status.extend(3.0D/16.0D, 700); // + 200 for 0, +300 for 1
+ status.extend(7.0D/16.0D, 2300); // + 450 for 1, + 1500 for 2, + 1050 for 3
+ status.extend(1.0D, 1400); // +1125 for 3, +2100 for 4, +1900 for 5,
+ ; // +1700 for 6, +1500 for 7
+
+ results = status.getValues();
+
+ assertEquals("Bad interpolation in status segment 0", 275, results[0]);
+ assertEquals("Bad interpolation in status segment 1", 750, results[1]);
+ assertEquals("Bad interpolation in status segment 2", 1500, results[2]);
+ assertEquals("Bad interpolation in status segment 3", 2175, results[3]);
+ assertEquals("Bad interpolation in status segment 4", 2100, results[4]);
+ assertEquals("Bad interpolation in status segment 5", 1900, results[5]);
+ assertEquals("Bad interpolation in status segment 6", 1700, results[6]);
+ assertEquals("Bad interpolation in status segment 7", 1500, results[7]);
+ }
+}
|