hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1209281 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/or...
Date Thu, 01 Dec 2011 22:29:46 GMT
Author: tomwhite
Date: Thu Dec  1 22:29:42 2011
New Revision: 1209281

URL: http://svn.apache.org/viewvc?rev=1209281&view=rev
Log:
MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces introduced in MAPREDUCE-3169. Contributed by Ahmed Radwan.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
      - copied, changed from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputFormat.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLazyOutput.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalMRNotification.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
      - copied, changed from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/WordCount.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestChild.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestChild.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/map/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
      - copied unchanged from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
Removed:
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestLocalMRNotification.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/jobcontrol/TestLocalJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestChild.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLazyOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/map/TestMultithreadedMapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1209281&r1=1209280&r2=1209281&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Dec  1 22:29:42 2011
@@ -132,6 +132,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3169 amendment. Deprecate MiniMRCluster. (Ahmed Radwan via
     sseth)
 
+    MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces
+    introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite)
+
   OPTIMIZATIONS
 
   BUG FIXES

Copied: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?p2=hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java&p1=hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java&r1=1209272&r2=1209281&rev=1209281&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Dec  1 22:29:42 2011
@@ -59,19 +59,6 @@ public class TestNoDefaultsJobConf exten
 
     JobConf conf = new JobConf(false);
 
-    //seeding JT and NN info into non-defaults (empty jobconf)
-    String jobTrackerAddress = createJobConf().get(JTConfig.JT_IPC_ADDRESS);
-    if (jobTrackerAddress == null) {
-      jobTrackerAddress = "local";
-    }
-    conf.set(JTConfig.JT_IPC_ADDRESS, jobTrackerAddress);
-    if (jobTrackerAddress == "local") {
-      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
-    }
-    else {
-      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);      
-    }
-    
     conf.set("fs.default.name", createJobConf().get("fs.default.name"));
 
     conf.setJobName("mr");

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java?rev=1209281&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java Thu Dec  1 22:29:42 2011
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Abstract Test case class to run MR in local or cluster mode and in local FS
+ * or DFS.
+ *
+ * The Hadoop instance is started and stopped on each test method.
+ *
+ * If using DFS the filesystem is reformated at each start (test method).
+ *
+ * Job Configurations should be created using a configuration returned by the
+ * 'createJobConf()' method.
+ */
+public abstract class HadoopTestCase extends TestCase {
+  public static final int LOCAL_MR = 1;
+  public static final int CLUSTER_MR = 2;
+  public static final int LOCAL_FS = 4;
+  public static final int DFS_FS = 8;
+
+  private boolean localMR;
+  private boolean localFS;
+
+  private int taskTrackers;
+  private int dataNodes;
+
+  /**
+   * Creates a testcase for local or cluster MR using DFS.
+   *
+   * The DFS will be formatted regardless if there was one or not before in the
+   * given location.
+   *
+   * @param mrMode indicates if the MR should be local (LOCAL_MR) or cluster
+   * (CLUSTER_MR)
+   * @param fsMode indicates if the FS should be local (LOCAL_FS) or DFS (DFS_FS)
+   *
+   * local FS when using relative PATHs)
+   *
+   * @param taskTrackers number of task trackers to start when using cluster
+   *
+   * @param dataNodes number of data nodes to start when using DFS
+   *
+   * @throws IOException thrown if the base directory cannot be set.
+   */
+  public HadoopTestCase(int mrMode, int fsMode, int taskTrackers, int dataNodes)
+    throws IOException {
+    if (mrMode != LOCAL_MR && mrMode != CLUSTER_MR) {
+      throw new IllegalArgumentException(
+                                         "Invalid MapRed mode, must be LOCAL_MR or CLUSTER_MR");
+    }
+    if (fsMode != LOCAL_FS && fsMode != DFS_FS) {
+      throw new IllegalArgumentException(
+                                         "Invalid FileSystem mode, must be LOCAL_FS or DFS_FS");
+    }
+    if (taskTrackers < 1) {
+      throw new IllegalArgumentException(
+                                         "Invalid taskTrackers value, must be greater than 0");
+    }
+    if (dataNodes < 1) {
+      throw new IllegalArgumentException(
+                                         "Invalid dataNodes value, must be greater than 0");
+    }
+    localMR = (mrMode == LOCAL_MR);
+    localFS = (fsMode == LOCAL_FS);
+    /*
+      JobConf conf = new JobConf();
+      fsRoot = conf.get("hadoop.tmp.dir");
+
+      if (fsRoot == null) {
+      throw new IllegalArgumentException(
+      "hadoop.tmp.dir is not defined");
+      }
+
+      fsRoot = fsRoot.replace(' ', '+') + "/fs";
+
+      File file = new File(fsRoot);
+      if (!file.exists()) {
+      if (!file.mkdirs()) {
+      throw new RuntimeException("Could not create FS base path: " + file);
+      }
+      }
+    */
+    this.taskTrackers = taskTrackers;
+    this.dataNodes = dataNodes;
+  }
+
+  /**
+   * Indicates if the MR is running in local or cluster mode.
+   *
+   * @return returns TRUE if the MR is running locally, FALSE if running in
+   * cluster mode.
+   */
+  public boolean isLocalMR() {
+    return localMR;
+  }
+
+  /**
+   * Indicates if the filesystem is local or DFS.
+   *
+   * @return returns TRUE if the filesystem is local, FALSE if it is DFS.
+   */
+  public boolean isLocalFS() {
+    return localFS;
+  }
+
+
+  private MiniDFSCluster dfsCluster = null;
+  private MiniMRCluster mrCluster = null;
+  private FileSystem fileSystem = null;
+
+  /**
+   * Creates Hadoop instance based on constructor configuration before
+   * a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void setUp() throws Exception {
+    super.setUp();
+    if (localFS) {
+      fileSystem = FileSystem.getLocal(new JobConf());
+    }
+    else {
+      dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
+      fileSystem = dfsCluster.getFileSystem();
+    }
+    if (localMR) {
+    }
+    else {
+      //noinspection deprecation
+      mrCluster = new MiniMRCluster(taskTrackers, fileSystem.getUri().toString(), 1);
+    }
+  }
+
+  /**
+   * Destroys Hadoop instance based on constructor configuration after
+   * a test case is run.
+   *
+   * @throws Exception
+   */
+  protected void tearDown() throws Exception {
+    try {
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
+    try {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+    }
+    catch (Exception ex) {
+      System.out.println(ex);
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Returns the Filesystem in use.
+   *
+   * TestCases should use this Filesystem as it
+   * is properly configured with the workingDir for relative PATHs.
+   *
+   * @return the filesystem used by Hadoop.
+   */
+  protected FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  /**
+   * Returns a job configuration preconfigured to run against the Hadoop
+   * managed by the testcase.
+   * @return configuration that works on the testcase Hadoop instance
+   */
+  protected JobConf createJobConf() {
+    if (localMR) {
+      JobConf conf = new JobConf();
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+      return conf;
+    } 
+    else {
+      return mrCluster.createJobConf();
+    }
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/HadoopTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java?rev=1209281&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java Thu Dec  1 22:29:42 2011
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.ServletException;
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+/**
+ * Base class to test Job end notification in local and cluster mode.
+ *
+ * Starts up hadoop on Local or Cluster mode (by extending of the
+ * HadoopTestCase class) and it starts a servlet engine that hosts
+ * a servlet that will receive the notification of job finalization.
+ *
+ * The notification servlet returns a HTTP 400 the first time is called
+ * and a HTTP 200 the second time, thus testing retry.
+ *
+ * In both cases local file system is used (this is irrelevant for
+ * the tested functionality)
+ *
+ * 
+ */
+public abstract class NotificationTestCase extends HadoopTestCase {
+
+  protected NotificationTestCase(int mode) throws IOException {
+    super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  private int port;
+  private String contextPath = "/notification";
+  private String servletPath = "/mapred";
+  private Server webServer;
+
+  private void startHttpServer() throws Exception {
+
+    // Create the webServer
+    if (webServer != null) {
+      webServer.stop();
+      webServer = null;
+    }
+    webServer = new Server(0);
+
+    Context context = new Context(webServer, contextPath);
+
+    // create servlet handler
+    context.addServlet(new ServletHolder(new NotificationServlet()),
+                       servletPath);
+
+    // Start webServer
+    webServer.start();
+    port = webServer.getConnectors()[0].getLocalPort();
+
+  }
+
+  private void stopHttpServer() throws Exception {
+    if (webServer != null) {
+      webServer.stop();
+      webServer.destroy();
+      webServer = null;
+    }
+  }
+
+  public static class NotificationServlet extends HttpServlet {
+    public static int counter = 0;
+    private static final long serialVersionUID = 1L;
+    
+    protected void doGet(HttpServletRequest req, HttpServletResponse res)
+      throws ServletException, IOException {
+      switch (counter) {
+        case 0:
+        {
+          assertTrue(req.getQueryString().contains("SUCCEEDED"));
+        }
+        break;
+        case 2:
+        {
+          assertTrue(req.getQueryString().contains("KILLED"));
+        }
+        break;
+        case 4:
+        {
+          assertTrue(req.getQueryString().contains("FAILED"));
+        }
+        break;
+      }
+      if (counter % 2 == 0) {
+        res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
+      }
+      else {
+        res.setStatus(HttpServletResponse.SC_OK);
+      }
+      counter++;
+    }
+  }
+
+  private String getNotificationUrlTemplate() {
+    return "http://localhost:" + port + contextPath + servletPath +
+      "?jobId=$jobId&amp;jobStatus=$jobStatus";
+  }
+
+  protected JobConf createJobConf() {
+    JobConf conf = super.createJobConf();
+    conf.setJobEndNotificationURI(getNotificationUrlTemplate());
+    conf.setInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 3);
+    conf.setInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 200);
+    return conf;
+  }
+
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    startHttpServer();
+  }
+
+  protected void tearDown() throws Exception {
+    stopHttpServer();
+    super.tearDown();
+  }
+
+  public void testMR() throws Exception {
+    System.out.println(launchWordCount(this.createJobConf(),
+                                       "a b c d e f g h", 1, 1));
+    Thread.sleep(2000);
+    assertEquals(2, NotificationServlet.counter);
+    
+    Path inDir = new Path("notificationjob/input");
+    Path outDir = new Path("notificationjob/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data","/tmp")
+        .toString().replace(' ', '+');;
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+    // run a job with KILLED status
+    System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
+                                                outDir).getID());
+    Thread.sleep(2000);
+    assertEquals(4, NotificationServlet.counter);
+    
+    // run a job with FAILED status
+    System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
+                                                outDir).getID());
+    Thread.sleep(2000);
+    assertEquals(6, NotificationServlet.counter);
+  }
+
+  private String launchWordCount(JobConf conf,
+                                 String input,
+                                 int numMaps,
+                                 int numReduces) throws IOException {
+    Path inDir = new Path("testing/wc/input");
+    Path outDir = new Path("testing/wc/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data","/tmp")
+        .toString().replace(' ', '+');;
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.setJobName("wordcount");
+    conf.setInputFormat(TextInputFormat.class);
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(WordCount.MapClass.class);
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+    JobClient.runJob(conf);
+    return MapReduceTestUtil.readOutput(outDir, conf);
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java?rev=1209281&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java Thu Dec  1 22:29:42 2011
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.*;
+
+/**
+ * A set of utilities to validate the <b>sort</b> of the map-reduce framework.
+ * This utility program has 2 main parts:
+ * 1. Checking the records' statistics
+ *   a) Validates the no. of bytes and records in sort's input & output. 
+ *   b) Validates the xor of the md5's of each key/value pair.
+ *   c) Ensures same key/value is present in both input and output.
+ * 2. Check individual records  to ensure each record is present in both
+ *    the input and the output of the sort (expensive on large data-sets). 
+ *    
+ * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
+ *            [-m <i>maps</i>] [-r <i>reduces</i>] [-deep] 
+ *            -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i> 
+ */
+public class SortValidator extends Configured implements Tool {
+
+  static private final IntWritable sortInput = new IntWritable(1); 
+  static private final IntWritable sortOutput = new IntWritable(2); 
+  static public String SORT_REDUCES = 
+    "mapreduce.sortvalidator.sort.reduce.tasks";
+  static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost";
+  static public String REDUCES_PER_HOST = 
+    "mapreduce.sortvalidator.reducesperhost";
+  static void printUsage() {
+    System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
+                       "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
+    System.exit(1);
+  }
+
+  static private IntWritable deduceInputFile(JobConf job) {
+    Path[] inputPaths = FileInputFormat.getInputPaths(job);
+    Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
+
+    // value == one for sort-input; value == two for sort-output
+    return (inputFile.getParent().equals(inputPaths[0])) ? 
+        sortInput : sortOutput;
+  }
+  
+  static private byte[] pair(BytesWritable a, BytesWritable b) {
+    byte[] pairData = new byte[a.getLength()+ b.getLength()];
+    System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
+    System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
+    return pairData;
+  }
+
+  private static final PathFilter sortPathsFilter = new PathFilter() {
+    public boolean accept(Path path) {
+      return (path.getName().startsWith("part-"));
+    }
+  };
+  
+  /**
+   * A simple map-reduce job which checks consistency of the
+   * MapReduce framework's sort by checking:
+   * a) Records are sorted correctly
+   * b) Keys are partitioned correctly
+   * c) The input and output have same no. of bytes and records.
+   * d) The input and output have the correct 'checksum' by xor'ing 
+   *    the md5 of each record.
+   *    
+   */
+  public static class RecordStatsChecker {
+
+    /**
+     * Generic way to get <b>raw</b> data from a {@link Writable}.
+     */
+    static class Raw {
+      /**
+       * Get raw data bytes from a {@link Writable}
+       * @param writable {@link Writable} object from whom to get the raw data
+       * @return raw data of the writable
+       */
+      public byte[] getRawBytes(Writable writable) {
+        return writable.toString().getBytes(); 
+      } 
+      
+      /**
+       * Get number of raw data bytes of the {@link Writable}
+       * @param writable {@link Writable} object from whom to get the raw data
+       *                 length
+       * @return number of raw data bytes
+       */
+      public int getRawBytesLength(Writable writable) {
+        return writable.toString().getBytes().length; 
+      }
+    }
+
+    /**
+     * Specialization of {@link Raw} for {@link BytesWritable}.
+     */
+    static class RawBytesWritable extends Raw  {
+      public byte[] getRawBytes(Writable bw) {
+        return ((BytesWritable)bw).getBytes();
+      }
+      public int getRawBytesLength(Writable bw) {
+        return ((BytesWritable)bw).getLength(); 
+      }
+    }
+    
+    /**
+     * Specialization of {@link Raw} for {@link Text}.
+     */
+    static class RawText extends Raw  {
+      public byte[] getRawBytes(Writable text) {
+        return ((Text)text).getBytes();
+      }
+      public int getRawBytesLength(Writable text) {
+        return ((Text)text).getLength();
+      }
+    }
+    
+    private static Raw createRaw(Class rawClass) {
+      if (rawClass == Text.class) {
+        return new RawText();
+      } else if (rawClass == BytesWritable.class) {
+        System.err.println("Returning " + RawBytesWritable.class);
+        return new RawBytesWritable();
+      }      
+      return new Raw();
+    }
+
+    public static class RecordStatsWritable implements Writable {
+      private long bytes = 0;
+      private long records = 0;
+      private int checksum = 0;
+      
+      public RecordStatsWritable() {}
+      
+      public RecordStatsWritable(long bytes, long records, int checksum) {
+        this.bytes = bytes;
+        this.records = records;
+        this.checksum = checksum;
+      }
+      
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVLong(out, bytes);
+        WritableUtils.writeVLong(out, records);
+        WritableUtils.writeVInt(out, checksum);
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        bytes = WritableUtils.readVLong(in);
+        records = WritableUtils.readVLong(in);
+        checksum = WritableUtils.readVInt(in);
+      }
+      
+      public long getBytes() { return bytes; }
+      public long getRecords() { return records; }
+      public int getChecksum() { return checksum; }
+    }
+    
+    public static class Map extends MapReduceBase
+      implements Mapper<WritableComparable, Writable,
+                        IntWritable, RecordStatsWritable> {
+      
+      private IntWritable key = null;
+      private WritableComparable prevKey = null;
+      private Class<? extends WritableComparable> keyClass;
+      private Partitioner<WritableComparable, Writable> partitioner = null;
+      private int partition = -1;
+      private int noSortReducers = -1;
+      private long recordId = -1;
+      
+      private Raw rawKey;
+      private Raw rawValue;
+
+      public void configure(JobConf job) {
+        // 'key' == sortInput for sort-input; key == sortOutput for sort-output
+        key = deduceInputFile(job);
+        
+        if (key == sortOutput) {
+          partitioner = new HashPartitioner<WritableComparable, Writable>();
+          
+          // Figure the 'current' partition and no. of reduces of the 'sort'
+          try {
+            URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
+            String inputFile = inputURI.getPath();
+            // part file is of the form part-r-xxxxx
+            partition = Integer.valueOf(inputFile.substring(
+              inputFile.lastIndexOf("part") + 7)).intValue();
+            noSortReducers = job.getInt(SORT_REDUCES, -1);
+          } catch (Exception e) {
+            System.err.println("Caught: " + e);
+            System.exit(-1);
+          }
+        }
+      }
+      
+      @SuppressWarnings("unchecked")
+      public void map(WritableComparable key, Writable value,
+                      OutputCollector<IntWritable, RecordStatsWritable> output, 
+                      Reporter reporter) throws IOException {
+        // Set up rawKey and rawValue on the first call to 'map'
+        if (recordId == -1) {
+         rawKey = createRaw(key.getClass());
+         rawValue = createRaw(value.getClass());
+        }
+        ++recordId;
+        
+        if (this.key == sortOutput) {
+          // Check if keys are 'sorted' if this  
+          // record is from sort's output
+          if (prevKey == null) {
+            prevKey = key;
+            keyClass = prevKey.getClass();
+          } else {
+            // Sanity check
+            if (keyClass != key.getClass()) {
+              throw new IOException("Type mismatch in key: expected " +
+                                    keyClass.getName() + ", received " +
+                                    key.getClass().getName());
+            }
+            
+            // Check if they were sorted correctly
+            if (prevKey.compareTo(key) > 0) {
+              throw new IOException("The 'map-reduce' framework wrongly" +
+                                    " classifed (" + prevKey + ") > (" + 
+                                    key + ") "+ "for record# " + recordId); 
+            }
+            prevKey = key;
+          }
+
+          // Check if the sorted output is 'partitioned' right
+          int keyPartition = 
+            partitioner.getPartition(key, value, noSortReducers);
+          if (partition != keyPartition) {
+            throw new IOException("Partitions do not match for record# " + 
+                                  recordId + " ! - '" + partition + "' v/s '" + 
+                                  keyPartition + "'");
+          }
+        }
+
+        // Construct the record-stats and output (this.key, record-stats)
+        byte[] keyBytes = rawKey.getRawBytes(key);
+        int keyBytesLen = rawKey.getRawBytesLength(key);
+        byte[] valueBytes = rawValue.getRawBytes(value);
+        int valueBytesLen = rawValue.getRawBytesLength(value);
+        
+        int keyValueChecksum = 
+          (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
+           WritableComparator.hashBytes(valueBytes, valueBytesLen));
+
+        output.collect(this.key, 
+                       new RecordStatsWritable((keyBytesLen+valueBytesLen),
+                       1, keyValueChecksum)
+                      );
+      }
+      
+    }
+    
+    public static class Reduce extends MapReduceBase
+      implements Reducer<IntWritable, RecordStatsWritable,
+                         IntWritable, RecordStatsWritable> {
+      
+      public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
+                         OutputCollector<IntWritable,
+                                         RecordStatsWritable> output, 
+                         Reporter reporter) throws IOException {
+        long bytes = 0;
+        long records = 0;
+        int xor = 0;
+        while (values.hasNext()) {
+          RecordStatsWritable stats = values.next();
+          bytes += stats.getBytes();
+          records += stats.getRecords();
+          xor ^= stats.getChecksum(); 
+        }
+        
+        output.collect(key, new RecordStatsWritable(bytes, records, xor));
+      }
+    }
+    
+    public static class NonSplitableSequenceFileInputFormat 
+      extends SequenceFileInputFormat {
+      protected boolean isSplitable(FileSystem fs, Path filename) {
+        return false;
+      }
+    }
+    
+    static void checkRecords(Configuration defaults, 
+                             Path sortInput, Path sortOutput) throws IOException {
+      FileSystem inputfs = sortInput.getFileSystem(defaults);
+      FileSystem outputfs = sortOutput.getFileSystem(defaults);
+      FileSystem defaultfs = FileSystem.get(defaults);
+      JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
+      jobConf.setJobName("sortvalidate-recordstats-checker");
+
+      int noSortReduceTasks = 
+        outputfs.listStatus(sortOutput, sortPathsFilter).length;
+      jobConf.setInt(SORT_REDUCES, noSortReduceTasks);
+      int noSortInputpaths =  inputfs.listStatus(sortInput).length;
+
+      jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      
+      jobConf.setOutputKeyClass(IntWritable.class);
+      jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
+      
+      jobConf.setMapperClass(Map.class);
+      jobConf.setCombinerClass(Reduce.class);
+      jobConf.setReducerClass(Reduce.class);
+      
+      jobConf.setNumMapTasks(noSortReduceTasks);
+      jobConf.setNumReduceTasks(1);
+
+      FileInputFormat.setInputPaths(jobConf, sortInput);
+      FileInputFormat.addInputPath(jobConf, sortOutput);
+      Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
+      if (defaultfs.exists(outputPath)) {
+        defaultfs.delete(outputPath, true);
+      }
+      FileOutputFormat.setOutputPath(jobConf, outputPath);
+      
+      // Uncomment to run locally in a single process
+      //job_conf.set(JTConfig.JT, "local");
+      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
+      System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
+                         "from " + inputPaths[0] + " (" + 
+                         noSortInputpaths + " files), " + 
+                         inputPaths[1] + " (" + 
+                         noSortReduceTasks + 
+                         " files) into " + 
+                         FileOutputFormat.getOutputPath(jobConf) + 
+                         " with 1 reducer.");
+      Date startTime = new Date();
+      System.out.println("Job started: " + startTime);
+      JobClient.runJob(jobConf);
+      Date end_time = new Date();
+      System.out.println("Job ended: " + end_time);
+      System.out.println("The job took " + 
+                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+      
+      // Check to ensure that the statistics of the 
+      // framework's sort-input and sort-output match
+      SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
+                                                          new Path(outputPath, "part-00000"), defaults);
+      IntWritable k1 = new IntWritable();
+      IntWritable k2 = new IntWritable();
+      RecordStatsWritable v1 = new RecordStatsWritable();
+      RecordStatsWritable v2 = new RecordStatsWritable();
+      if (!stats.next(k1, v1)) {
+        throw new IOException("Failed to read record #1 from reduce's output");
+      }
+      if (!stats.next(k2, v2)) {
+        throw new IOException("Failed to read record #2 from reduce's output");
+      }
+
+      if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 
+          v1.getChecksum() != v2.getChecksum()) {
+        throw new IOException("(" + 
+                              v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
+                              v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
+      }
+    }
+
+  }
+  
+  /**
+   * A simple map-reduce task to check if the input and the output
+   * of the framework's sort is consistent by ensuring each record 
+   * is present in both the input and the output.
+   * 
+   */
+  public static class RecordChecker {
+    
+    public static class Map extends MapReduceBase
+      implements Mapper<BytesWritable, BytesWritable,
+                        BytesWritable, IntWritable> {
+      
+      private IntWritable value = null;
+      
+      public void configure(JobConf job) {
+        // value == one for sort-input; value == two for sort-output
+        value = deduceInputFile(job);
+      }
+      
+      public void map(BytesWritable key, 
+                      BytesWritable value,
+                      OutputCollector<BytesWritable, IntWritable> output, 
+                      Reporter reporter) throws IOException {
+        // newKey = (key, value)
+        BytesWritable keyValue = new BytesWritable(pair(key, value));
+    
+        // output (newKey, value)
+        output.collect(keyValue, this.value);
+      }
+    }
+    
+    public static class Reduce extends MapReduceBase
+      implements Reducer<BytesWritable, IntWritable,
+                        BytesWritable, IntWritable> {
+      
+      public void reduce(BytesWritable key, Iterator<IntWritable> values,
+                         OutputCollector<BytesWritable, IntWritable> output,
+                         Reporter reporter) throws IOException {
+        int ones = 0;
+        int twos = 0;
+        while (values.hasNext()) {
+          IntWritable count = values.next(); 
+          if (count.equals(sortInput)) {
+            ++ones;
+          } else if (count.equals(sortOutput)) {
+            ++twos;
+          } else {
+            throw new IOException("Invalid 'value' of " + count.get() + 
+                                  " for (key,value): " + key.toString());
+          }
+        }
+        
+        // Check to ensure there are equal no. of ones and twos
+        if (ones != twos) {
+          throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
+                                ") for (key, value): " + key.toString());
+        }
+      }
+    }
+    
+    static void checkRecords(Configuration defaults, int noMaps, int noReduces,
+                             Path sortInput, Path sortOutput) throws IOException {
+      JobConf jobConf = new JobConf(defaults, RecordChecker.class);
+      jobConf.setJobName("sortvalidate-record-checker");
+      
+      jobConf.setInputFormat(SequenceFileInputFormat.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      
+      jobConf.setOutputKeyClass(BytesWritable.class);
+      jobConf.setOutputValueClass(IntWritable.class);
+      
+      jobConf.setMapperClass(Map.class);        
+      jobConf.setReducerClass(Reduce.class);
+      
+      JobClient client = new JobClient(jobConf);
+      ClusterStatus cluster = client.getClusterStatus();
+      if (noMaps == -1) {
+        noMaps = cluster.getTaskTrackers() * 
+          jobConf.getInt(MAPS_PER_HOST, 10);
+      }
+      if (noReduces == -1) {
+        noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
+        String sortReduces = jobConf.get(REDUCES_PER_HOST);
+        if (sortReduces != null) {
+           noReduces = cluster.getTaskTrackers() * 
+                           Integer.parseInt(sortReduces);
+        }
+      }
+      jobConf.setNumMapTasks(noMaps);
+      jobConf.setNumReduceTasks(noReduces);
+      
+      FileInputFormat.setInputPaths(jobConf, sortInput);
+      FileInputFormat.addInputPath(jobConf, sortOutput);
+      Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
+      FileSystem fs = FileSystem.get(defaults);
+      if (fs.exists(outputPath)) {
+        fs.delete(outputPath, true);
+      }
+      FileOutputFormat.setOutputPath(jobConf, outputPath);
+      
+      // Uncomment to run locally in a single process
+      //job_conf.set(JTConfig.JT, "local");
+      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
+      System.out.println("\nSortValidator.RecordChecker: Running on " +
+                         cluster.getTaskTrackers() +
+                        " nodes to validate sort from " + 
+                         inputPaths[0] + ", " + 
+                         inputPaths[1] + " into " + 
+                         FileOutputFormat.getOutputPath(jobConf) + 
+                         " with " + noReduces + " reduces.");
+      Date startTime = new Date();
+      System.out.println("Job started: " + startTime);
+      JobClient.runJob(jobConf);
+      Date end_time = new Date();
+      System.out.println("Job ended: " + end_time);
+      System.out.println("The job took " + 
+                         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+    }
+  }
+
+  
+  /**
+   * The main driver for sort-validator program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the 
+   *                     job tracker.
+   */
+  public int run(String[] args) throws Exception {
+    Configuration defaults = getConf();
+    
+    int noMaps = -1, noReduces = -1;
+    Path sortInput = null, sortOutput = null;
+    boolean deepTest = false;
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          noMaps = Integer.parseInt(args[++i]);
+        } else if ("-r".equals(args[i])) {
+          noReduces = Integer.parseInt(args[++i]);
+        } else if ("-sortInput".equals(args[i])){
+          sortInput = new Path(args[++i]);
+        } else if ("-sortOutput".equals(args[i])){
+          sortOutput = new Path(args[++i]);
+        } else if ("-deep".equals(args[i])) {
+          deepTest = true;
+        } else {
+          printUsage();
+          return -1;
+        }
+      } catch (NumberFormatException except) {
+        System.err.println("ERROR: Integer expected instead of " + args[i]);
+        printUsage();
+        return -1;
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.err.println("ERROR: Required parameter missing from " +
+                           args[i-1]);
+        printUsage();
+        return -1;
+      }
+    }
+    
+    // Sanity check
+    if (sortInput == null || sortOutput == null) {
+      printUsage();
+      return -2;
+    }
+
+    // Check if the records are consistent and sorted correctly
+    RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
+
+    // Check if the same records are present in sort's inputs & outputs
+    if (deepTest) {
+      RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, 
+                                 sortOutput);
+    }
+    
+    System.out.println("\nSUCCESS! Validated the MapReduce framework's 'sort'" +
+                       " successfully.");
+    
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
+    System.exit(res);
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SortValidator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (from r1209272, hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?p2=hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java&p1=hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java&r1=1209272&r2=1209281&rev=1209281&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java Thu Dec  1 22:29:42 2011
@@ -48,7 +48,6 @@ public class TestSpecialCharactersInOutp
   private static final String OUTPUT_FILENAME = "result[0]";
   
   public static boolean launchJob(URI fileSys,
-                                  String jobTracker,
                                   JobConf conf,
                                   int numMaps,
                                   int numReduces) throws IOException {
@@ -68,8 +67,6 @@ public class TestSpecialCharactersInOutp
 
     // use WordCount example
     FileSystem.setDefaultUri(conf, fileSys);
-    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
-    conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("foo");
 
     conf.setInputFormat(TextInputFormat.class);
@@ -113,11 +110,9 @@ public class TestSpecialCharactersInOutp
       fileSys = dfs.getFileSystem();
       namenode = fileSys.getUri().toString();
       mr = new MiniMRCluster(taskTrackers, namenode, 2);
-      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
       JobConf jobConf = new JobConf();
       boolean result;
-      result = launchJob(fileSys.getUri(), jobTrackerName, jobConf, 
-                              3, 1);
+      result = launchJob(fileSys.getUri(), jobConf, 3, 1);
       assertTrue(result);
           
     } finally {

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java?rev=1209281&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java Thu Dec  1 22:29:42 2011
@@ -0,0 +1,787 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.commons.logging.Log;
+
+/** 
+ * Utilities used in unit test.
+ *  
+ */
+public class UtilsForTests {
+
+  static final Log LOG = LogFactory.getLog(UtilsForTests.class);
+
+  final static long KB = 1024L * 1;
+  final static long MB = 1024L * KB;
+  final static long GB = 1024L * MB;
+  final static long TB = 1024L * GB;
+  final static long PB = 1024L * TB;
+  final static Object waitLock = new Object();
+
+  static DecimalFormat dfm = new DecimalFormat("####.000");
+  static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
+
+  public static String dfmt(double d) {
+    return dfm.format(d);
+  }
+
+  public static String ifmt(double d) {
+    return ifm.format(d);
+  }
+
+  public static String formatBytes(long numBytes) {
+    StringBuffer buf = new StringBuffer();
+    boolean bDetails = true;
+    double num = numBytes;
+
+    if (numBytes < KB) {
+      buf.append(numBytes + " B");
+      bDetails = false;
+    } else if (numBytes < MB) {
+      buf.append(dfmt(num / KB) + " KB");
+    } else if (numBytes < GB) {
+      buf.append(dfmt(num / MB) + " MB");
+    } else if (numBytes < TB) {
+      buf.append(dfmt(num / GB) + " GB");
+    } else if (numBytes < PB) {
+      buf.append(dfmt(num / TB) + " TB");
+    } else {
+      buf.append(dfmt(num / PB) + " PB");
+    }
+    if (bDetails) {
+      buf.append(" (" + ifmt(numBytes) + " bytes)");
+    }
+    return buf.toString();
+  }
+
+  public static String formatBytes2(long numBytes) {
+    StringBuffer buf = new StringBuffer();
+    long u = 0;
+    if (numBytes >= TB) {
+      u = numBytes / TB;
+      numBytes -= u * TB;
+      buf.append(u + " TB ");
+    }
+    if (numBytes >= GB) {
+      u = numBytes / GB;
+      numBytes -= u * GB;
+      buf.append(u + " GB ");
+    }
+    if (numBytes >= MB) {
+      u = numBytes / MB;
+      numBytes -= u * MB;
+      buf.append(u + " MB ");
+    }
+    if (numBytes >= KB) {
+      u = numBytes / KB;
+      numBytes -= u * KB;
+      buf.append(u + " KB ");
+    }
+    buf.append(u + " B"); //even if zero
+    return buf.toString();
+  }
+
+  static final String regexpSpecials = "[]()?*+|.!^-\\~@";
+
+  public static String regexpEscape(String plain) {
+    StringBuffer buf = new StringBuffer();
+    char[] ch = plain.toCharArray();
+    int csup = ch.length;
+    for (int c = 0; c < csup; c++) {
+      if (regexpSpecials.indexOf(ch[c]) != -1) {
+        buf.append("\\");
+      }
+      buf.append(ch[c]);
+    }
+    return buf.toString();
+  }
+
+  public static String safeGetCanonicalPath(File f) {
+    try {
+      String s = f.getCanonicalPath();
+      return (s == null) ? f.toString() : s;
+    } catch (IOException io) {
+      return f.toString();
+    }
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  public static String slurpHadoop(Path p, FileSystem fs) throws IOException {
+    int len = (int) fs.getFileStatus(p).getLen();
+    byte[] buf = new byte[len];
+    InputStream in = fs.open(p);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  public static String rjustify(String s, int width) {
+    if (s == null) s = "null";
+    if (width > s.length()) {
+      s = getSpace(width - s.length()) + s;
+    }
+    return s;
+  }
+
+  public static String ljustify(String s, int width) {
+    if (s == null) s = "null";
+    if (width > s.length()) {
+      s = s + getSpace(width - s.length());
+    }
+    return s;
+  }
+
+  static char[] space;
+  static {
+    space = new char[300];
+    Arrays.fill(space, '\u0020');
+  }
+
+  public static String getSpace(int len) {
+    if (len > space.length) {
+      space = new char[Math.max(len, 2 * space.length)];
+      Arrays.fill(space, '\u0020');
+    }
+    return new String(space, 0, len);
+  }
+  
+  /**
+   * Gets job status from the jobtracker given the jobclient and the job id
+   */
+  static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
+    JobStatus[] statuses = jc.getAllJobs();
+    for (JobStatus jobStatus : statuses) {
+      if (jobStatus.getJobID().equals(id)) {
+        return jobStatus;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * A utility that waits for specified amount of time
+   */
+  public static void waitFor(long duration) {
+    try {
+      synchronized (waitLock) {
+        waitLock.wait(duration);
+      }
+    } catch (InterruptedException ie) {}
+  }
+  
+  /**
+   * Wait for the jobtracker to be RUNNING.
+   */
+  static void waitForJobTracker(JobClient jobClient) {
+    while (true) {
+      try {
+        ClusterStatus status = jobClient.getClusterStatus();
+        while (status.getJobTrackerStatus() != JobTrackerStatus.RUNNING) {
+          waitFor(100);
+          status = jobClient.getClusterStatus();
+        }
+        break; // means that the jt is ready
+      } catch (IOException ioe) {}
+    }
+  }
+  
+  /**
+   * Waits until all the jobs at the jobtracker complete.
+   */
+  static void waitTillDone(JobClient jobClient) throws IOException {
+    // Wait for the last job to complete
+    while (true) {
+      boolean shouldWait = false;
+      for (JobStatus jobStatuses : jobClient.getAllJobs()) {
+        if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+            && jobStatuses.getRunState() != JobStatus.FAILED
+            && jobStatuses.getRunState() != JobStatus.KILLED) {
+          shouldWait = true;
+          break;
+        }
+      }
+      if (shouldWait) {
+        waitFor(100);
+      } else {
+        break;
+      }
+    }
+  }
+  
+  /**
+   * Configure a waiting job
+   */
+  static void configureWaitingJobConf(JobConf jobConf, Path inDir,
+                                      Path outputPath, int numMaps, int numRed,
+                                      String jobName, String mapSignalFilename,
+                                      String redSignalFilename)
+  throws IOException {
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outputPath);
+    jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setInputFormat(RandomInputFormat.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(numRed);
+    jobConf.setJar("build/test/mapred/testjar/testjob.jar");
+    jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
+    jobConf.set(getTaskSignalParameter(false), redSignalFilename);
+  }
+
+  /**
+   * Commonly used map and reduce classes 
+   */
+  
+  /** 
+   * Map is a Mapper that just waits for a file to be created on the dfs. The 
+   * file creation is a signal to the mappers and hence acts as a waiting job. 
+   */
+
+  static class WaitingMapper 
+  extends MapReduceBase 
+  implements Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+
+    FileSystem fs = null;
+    Path signal;
+    int id = 0;
+    int totalMaps = 0;
+
+    /**
+     * Checks if the map task needs to wait. By default all the maps will wait.
+     * This method needs to be overridden to make a custom waiting mapper. 
+     */
+    public boolean shouldWait(int id) {
+      return true;
+    }
+    
+    /**
+     * Returns a signal file on which the map task should wait. By default all 
+     * the maps wait on a single file passed as test.mapred.map.waiting.target.
+     * This method needs to be overridden to make a custom waiting mapper
+     */
+    public Path getSignalFile(int id) {
+      return signal;
+    }
+    
+    /** The waiting function.  The map exits once it gets a signal. Here the 
+     * signal is the file existence. 
+     */
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      if (shouldWait(id)) {
+        if (fs != null) {
+          while (!fs.exists(getSignalFile(id))) {
+            try {
+              reporter.progress();
+              synchronized (this) {
+                this.wait(1000); // wait for 1 sec
+              }
+            } catch (InterruptedException ie) {
+              System.out.println("Interrupted while the map was waiting for "
+                                 + " the signal.");
+              break;
+            }
+          }
+        } else {
+          throw new IOException("Could not get the DFS!!");
+        }
+      }
+    }
+
+    public void configure(JobConf conf) {
+      try {
+        String taskId = conf.get(JobContext.TASK_ATTEMPT_ID);
+        id = Integer.parseInt(taskId.split("_")[4]);
+        totalMaps = Integer.parseInt(conf.get(JobContext.NUM_MAPS));
+        fs = FileSystem.get(conf);
+        signal = new Path(conf.get(getTaskSignalParameter(true)));
+      } catch (IOException ioe) {
+        System.out.println("Got an exception while obtaining the filesystem");
+      }
+    }
+  }
+  
+  /** Only the later half of the maps wait for the signal while the rest 
+   * complete immediately.
+   */
+  static class HalfWaitingMapper extends WaitingMapper {
+    @Override
+    public boolean shouldWait(int id) {
+      return id >= (totalMaps / 2);
+    }
+  }
+  
+  /** 
+   * Reduce that just waits for a file to be created on the dfs. The 
+   * file creation is a signal to the reduce.
+   */
+
+  static class WaitingReducer extends MapReduceBase 
+  implements Reducer<WritableComparable, Writable, 
+                     WritableComparable, Writable> {
+
+    FileSystem fs = null;
+    Path signal;
+    
+    /** The waiting function.  The reduce exits once it gets a signal. Here the
+     * signal is the file existence. 
+     */
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter)
+    throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000); // wait for 1 sec
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for the"
+                               + " signal.");
+            break;
+          }
+        }
+      } else {
+        throw new IOException("Could not get the DFS!!");
+      }
+    }
+
+    public void configure(JobConf conf) {
+      try {
+        fs = FileSystem.get(conf);
+        signal = new Path(conf.get(getTaskSignalParameter(false)));
+      } catch (IOException ioe) {
+        System.out.println("Got an exception while obtaining the filesystem");
+      }
+    }
+  }
+  
+  static String getTaskSignalParameter(boolean isMap) {
+    return isMap 
+           ? "test.mapred.map.waiting.target" 
+           : "test.mapred.reduce.waiting.target";
+  }
+  
+  /**
+   * Signal the maps/reduces to start.
+   */
+  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
+                          String mapSignalFile, 
+                          String reduceSignalFile, int replication) 
+  throws IOException {
+    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
+              (short)replication);
+    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), 
+              (short)replication);
+  }
+  
+  /**
+   * Signal the maps/reduces to start.
+   */
+  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
+                          boolean isMap, String mapSignalFile, 
+                          String reduceSignalFile)
+  throws IOException {
+    //  signal the maps to complete
+    writeFile(dfs.getNameNode(), fileSys.getConf(),
+              isMap 
+              ? new Path(mapSignalFile)
+              : new Path(reduceSignalFile), (short)1);
+  }
+  
+  static String getSignalFile(Path dir) {
+    return (new Path(dir, "signal")).toString();
+  }
+  
+  static String getMapSignalFile(Path dir) {
+    return (new Path(dir, "map-signal")).toString();
+  }
+
+  static String getReduceSignalFile(Path dir) {
+    return (new Path(dir, "reduce-signal")).toString();
+  }
+  
+  static void writeFile(NameNode namenode, Configuration conf, Path name, 
+      short replication) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fileSys, conf, name, 
+                                BytesWritable.class, BytesWritable.class,
+                                CompressionType.NONE);
+    writer.append(new BytesWritable(), new BytesWritable());
+    writer.close();
+    fileSys.setReplication(name, replication);
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+  
+  // Input formats
+  /**
+   * A custom input format that creates virtual inputs of a single string
+   * for each map. 
+   */
+  public static class RandomInputFormat implements InputFormat<Text, Text> {
+    
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      InputSplit[] result = new InputSplit[numSplits];
+      Path outDir = FileOutputFormat.getOutputPath(job);
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
+                                  0, 1, (String[])null);
+      }
+      return result;
+    }
+
+    static class RandomRecordReader implements RecordReader<Text, Text> {
+      Path name;
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      public boolean next(Text key, Text value) {
+        if (name != null) {
+          key.set(name.getName());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      public Text createKey() {
+        return new Text();
+      }
+      public Text createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() {}
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+                                                    JobConf job, 
+                                                    Reporter reporter) 
+    throws IOException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  // Start a job and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
+                    throws IOException {
+    return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks());
+  }
+
+  // Start a job and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
+                           int numReds) throws IOException {
+
+    String input = "The quick brown fox\n" + "has many silly\n"
+                   + "red fox sox\n";
+    
+    // submit the job and wait for it to complete
+    return runJob(conf, inDir, outDir, numMaps, numReds, input);
+  }
+  
+  // Start a job with the specified input and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
+                           int numReds, String input) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    
+    for (int i = 0; i < numMaps; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }    
+
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+
+    return job;
+  }
+
+  // Run a job that will be succeeded and wait until it completes
+  public static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
+         throws IOException {
+    conf.setJobName("test-job-succeed");
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (!job.isComplete()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    return job;
+  }
+
+  // Run a job that will be failed and wait until it completes
+  public static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
+         throws IOException {
+    conf.setJobName("test-job-fail");
+    conf.setMapperClass(FailMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setMaxMapAttempts(1);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (!job.isComplete()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    return job;
+  }
+
+  // Run a job that will be killed and wait until it completes
+  public static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
+         throws IOException {
+
+    conf.setJobName("test-job-kill");
+    conf.setMapperClass(KillMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    job.killJob();
+    while (job.cleanupProgress() == 0.0f) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {
+        break;
+      }
+    }
+
+    return job;
+  }
+  
+  /**
+   * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+   * asynchronously.
+   */
+  public static class InlineCleanupQueue extends CleanupQueue {
+    List<String> stalePaths = new ArrayList<String>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(PathDeletionContext... contexts) {
+      // delete paths in-line
+      for (PathDeletionContext context : contexts) {
+        try {
+          if (!deletePath(context)) {
+            LOG.warn("Stale path " + context.fullPath);
+            stalePaths.add(context.fullPath);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + context.fullPath);
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(context.fullPath);
+        }
+      }
+    }
+  }
+  
+  static class FakeClock extends Clock {
+    long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
+
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
+  // Mapper that fails
+  static class FailMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      //NOTE- the next line is required for the TestDebugScript test to succeed
+      System.err.println("failing map");
+      throw new RuntimeException("failing map");
+    }
+  }
+
+  // Mapper that sleeps for a long time.
+  // Used for running a job that will be killed
+  static class KillMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      try {
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
+  }
+
+  static void setUpConfigFile(Properties confProps, File configFile)
+      throws IOException {
+    Configuration config = new Configuration(false);
+    FileOutputStream fos = new FileOutputStream(configFile);
+
+    for (Enumeration<?> e = confProps.propertyNames(); e.hasMoreElements();) {
+      String key = (String) e.nextElement();
+      config.set(key, confProps.getProperty(key));
+    }
+
+    config.writeXml(fos);
+    fos.close();
+  }
+
+  /**
+   * This creates a file in the dfs
+   * @param dfs FileSystem Local File System where file needs to be picked
+   * @param URIPATH Path dfs path where file needs to be copied
+   * @param permission FsPermission File permission
+   * @return returns the DataOutputStream
+   */
+  public static DataOutputStream
+      createTmpFileDFS(FileSystem dfs, Path URIPATH,
+      FsPermission permission, String input) throws Exception {
+    //Creating the path with the file
+    DataOutputStream file =
+      FileSystem.create(dfs, URIPATH, permission);
+    file.writeBytes(input);
+    file.close();
+    return file;
+  }
+
+  /**
+   * This formats the long tasktracker name to just the FQDN
+   * @param taskTrackerLong String The long format of the tasktracker string
+   * @return String The FQDN of the tasktracker
+   * @throws Exception
+   */
+  public static String getFQDNofTT (String taskTrackerLong) throws Exception {
+    //Getting the exact FQDN of the tasktracker from the tasktracker string.
+    String[] firstSplit = taskTrackerLong.split("_");
+    String tmpOutput = firstSplit[1];
+    String[] secondSplit = tmpOutput.split(":");
+    String tmpTaskTracker = secondSplit[0];
+    return tmpTaskTracker;
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/UtilsForTests.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message