Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Oct 19 02:25:55 2012 @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -136,6 +138,43 @@ public class TestFileInputFormat { } } + /** + * Test when the input file's length is 0. + */ + @Test + public void testForEmptyFile() throws Exception { + Configuration conf = new Configuration(); + FileSystem fileSys = FileSystem.get(conf); + Path file = new Path("test" + "/file"); + FSDataOutputStream out = fileSys.create(file, true, + conf.getInt("io.file.buffer.size", 4096), (short) 1, (long) 1024); + out.write(new byte[0]); + out.close(); + + // split it using a File input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = Job.getInstance(conf); + FileInputFormat.setInputPaths(job, "test"); + List splits = inFormat.getSplits(job); + assertEquals(1, splits.size()); + FileSplit fileSplit = (FileSplit) splits.get(0); + assertEquals(0, fileSplit.getLocations().length); + assertEquals(file.getName(), fileSplit.getPath().getName()); + assertEquals(0, fileSplit.getStart()); + assertEquals(0, fileSplit.getLength()); + + fileSys.delete(file.getParent(), true); + } + + /** Dummy class to extend FileInputFormat*/ + private class DummyInputFormat extends FileInputFormat { + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return null; + } + } + private class FileInputFormatForTest extends FileInputFormat { long splitSize; Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Fri Oct 19 02:25:55 2012 @@ -21,19 +21,25 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.JavaSerializationComparator; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.WritableSerialization; import org.apache.hadoop.mapreduce.MRJobConfig; public class TestTotalOrderPartitioner extends TestCase { @@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner e new Text("yak"), // 9 }; + private static final String[] splitJavaStrings = new String[] { + // -inf // 0 + new String("aabbb"), // 1 + new String("babbb"), // 2 + new String("daddd"), // 3 + new String("dddee"), // 4 + new String("ddhee"), // 5 + new String("dingo"), // 6 + new String("hijjj"), // 7 + new String("n"), // 8 + new String("yak"), // 9 + }; + static class Check { T data; int part; @@ -76,19 +95,41 @@ public class TestTotalOrderPartitioner e testStrings.add(new Check(new Text("hi"), 6)); }; - private static > Path writePartitionFile( + private static final ArrayList> testJavaStrings = + new ArrayList>(); + static { + testJavaStrings.add(new Check(new String("aaaaa"), 0)); + testJavaStrings.add(new Check(new String("aaabb"), 0)); + testJavaStrings.add(new Check(new String("aabbb"), 1)); + testJavaStrings.add(new Check(new String("aaaaa"), 0)); + testJavaStrings.add(new Check(new String("babbb"), 2)); + testJavaStrings.add(new Check(new String("baabb"), 1)); + testJavaStrings.add(new Check(new String("yai"), 8)); + testJavaStrings.add(new Check(new String("yak"), 9)); + testJavaStrings.add(new Check(new String("z"), 9)); + testJavaStrings.add(new Check(new String("ddngo"), 5)); + testJavaStrings.add(new Check(new String("hi"), 6)); + }; + + + private static Path writePartitionFile( String testname, Configuration conf, T[] splits) throws IOException { final FileSystem fs = FileSystem.getLocal(conf); final Path testdir = new Path(System.getProperty("test.build.data", "/tmp") - ).makeQualified(fs); + ).makeQualified( + fs.getUri(), + fs.getWorkingDirectory()); Path p = new Path(testdir, testname + "/_partition.lst"); TotalOrderPartitioner.setPartitionFile(conf, p); conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1); SequenceFile.Writer w = null; try { - w = SequenceFile.createWriter(fs, conf, p, - splits[0].getClass(), NullWritable.class, - SequenceFile.CompressionType.NONE); + w = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(p), + SequenceFile.Writer.keyClass(splits[0].getClass()), + SequenceFile.Writer.valueClass(NullWritable.class), + SequenceFile.Writer.compression(CompressionType.NONE)); for (int i = 0; i < splits.length; ++i) { w.append(splits[i], NullWritable.get()); } @@ -99,6 +140,31 @@ public class TestTotalOrderPartitioner e return p; } + public void testTotalOrderWithCustomSerialization() throws Exception { + TotalOrderPartitioner partitioner = + new TotalOrderPartitioner(); + Configuration conf = new Configuration(); + conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, + JavaSerialization.class.getName(), + WritableSerialization.class.getName()); + conf.setClass(MRJobConfig.KEY_COMPARATOR, + JavaSerializationComparator.class, + Comparator.class); + Path p = TestTotalOrderPartitioner.writePartitionFile( + "totalordercustomserialization", conf, splitJavaStrings); + conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class); + try { + partitioner.setConf(conf); + NullWritable nw = NullWritable.get(); + for (Check chk : testJavaStrings) { + assertEquals(chk.data.toString(), chk.part, + partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1)); + } + } finally { + p.getFileSystem(conf).delete(p, true); + } + } + public void testTotalOrderMemCmp() throws Exception { TotalOrderPartitioner partitioner = new TotalOrderPartitioner(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Fri Oct 19 02:25:55 2012 @@ -88,8 +88,10 @@ public class TestUmbilicalProtocolWithJo .when(mockTT).getProtocolSignature(anyString(), anyLong(), anyInt()); JobTokenSecretManager sm = new JobTokenSecretManager(); - final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT, - ADDRESS, 0, 5, true, conf, sm); + final Server server = new RPC.Builder(conf) + .setProtocol(TaskUmbilicalProtocol.class).setInstance(mockTT) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .setSecretManager(sm).build(); server.start(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Fri Oct 19 02:25:55 2012 @@ -72,8 +72,10 @@ public class MiniMRYarnCluster extends M @Override public void init(Configuration conf) { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), - "apps_staging_dir/").getAbsolutePath()); + if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { + conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), + "apps_staging_dir/").getAbsolutePath()); + } conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); try { @@ -113,10 +115,6 @@ public class MiniMRYarnCluster extends M // for corresponding uberized tests. conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - // Set config for JH Server - conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, - JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS); - super.init(conf); } @@ -128,10 +126,15 @@ public class MiniMRYarnCluster extends M @Override public synchronized void start() { try { - getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS, - MiniYARNCluster.getHostname() + ":0"); - getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, - MiniYARNCluster.getHostname() + ":0"); + if (!getConfig().getBoolean( + JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, + JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) { + // pick free random ports. + getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS, + MiniYARNCluster.getHostname() + ":0"); + getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, + MiniYARNCluster.getHostname() + ":0"); + } historyServer = new JobHistoryServer(); historyServer.init(getConfig()); new Thread() { Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Fri Oct 19 02:25:55 2012 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -26,6 +27,7 @@ import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,10 +41,10 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -66,6 +68,7 @@ import org.apache.hadoop.mapreduce.lib.o import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.AfterClass; import org.junit.Assert; @@ -77,15 +80,24 @@ public class TestMRJobs { private static final Log LOG = LogFactory.getLog(TestMRJobs.class); protected static MiniMRYarnCluster mrCluster; + protected static MiniDFSCluster dfsCluster; private static Configuration conf = new Configuration(); private static FileSystem localFs; + private static FileSystem remoteFs; static { try { localFs = FileSystem.getLocal(conf); } catch (IOException io) { throw new RuntimeException("problem getting local fs", io); } + try { + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } } private static Path TEST_ROOT_DIR = new Path("target", @@ -104,6 +116,8 @@ public class TestMRJobs { if (mrCluster == null) { mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3); Configuration conf = new Configuration(); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); mrCluster.init(conf); mrCluster.start(); } @@ -120,6 +134,10 @@ public class TestMRJobs { mrCluster.stop(); mrCluster = null; } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } } @Test @@ -211,6 +229,7 @@ public class TestMRJobs { Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output"); FileOutputFormat.setOutputPath(job, outputDir); + job.setSpeculativeExecution(false); job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.setJarByClass(RandomTextWriterJob.class); job.setMaxMapAttempts(1); // speed up failures @@ -399,20 +418,20 @@ public class TestMRJobs { Configuration conf = context.getConfiguration(); Path[] files = context.getLocalCacheFiles(); Path[] archives = context.getLocalCacheArchives(); - FileSystem fs = LocalFileSystem.get(conf); - // Check that 3(2+ appjar) files and 2 archives are present - Assert.assertEquals(3, files.length); + // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files + // and 2 archives are present + Assert.assertEquals(4, files.length); Assert.assertEquals(2, archives.length); // Check lengths of the files - Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen()); - Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1); + Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen()); + Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1); // Check extraction of the archive - Assert.assertTrue(fs.exists(new Path(archives[0], + Assert.assertTrue(localFs.exists(new Path(archives[0], "distributed.jar.inside3"))); - Assert.assertTrue(fs.exists(new Path(archives[1], + Assert.assertTrue(localFs.exists(new Path(archives[1], "distributed.jar.inside4"))); // Check the class loaders @@ -423,16 +442,27 @@ public class TestMRJobs { Assert.assertNotNull(cl.getResource("distributed.jar.inside2")); Assert.assertNotNull(cl.getResource("distributed.jar.inside3")); Assert.assertNotNull(cl.getResource("distributed.jar.inside4")); + // The Job Jar should have been extracted to a folder named "job.jar" and + // added to the classpath; the two jar files in the lib folder in the Job + // Jar should have also been added to the classpath + Assert.assertNotNull(cl.getResource("job.jar/")); + Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar")); + Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar")); // Check that the symlink for the renaming was created in the cwd; File symlinkFile = new File("distributed.first.symlink"); Assert.assertTrue(symlinkFile.exists()); Assert.assertEquals(1, symlinkFile.length()); + + // Check that the symlink for the Job Jar was created in the cwd and + // points to the extracted directory + File jobJarDir = new File("job.jar"); + Assert.assertTrue(FileUtils.isSymlink(jobJarDir)); + Assert.assertTrue(jobJarDir.isDirectory()); } } - @Test - public void testDistributedCache() throws Exception { + public void _testDistributedCache(String jobJarPath) throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); @@ -450,7 +480,17 @@ public class TestMRJobs { makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); Job job = Job.getInstance(mrCluster.getConfig()); - job.setJarByClass(DistributedCacheChecker.class); + + // Set the job jar to a new "dummy" jar so we can check that its extracted + // properly + job.setJar(jobJarPath); + // Because the job jar is a "dummy" jar, we need to include the jar with + // DistributedCacheChecker or it won't be able to find it + Path distributedCacheCheckerJar = new Path( + JarFinder.getJar(DistributedCacheChecker.class)); + job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( + localFs.getUri(), distributedCacheCheckerJar.getParent())); + job.setMapperClass(DistributedCacheChecker.class); job.setOutputFormatClass(NullOutputFormat.class); @@ -459,10 +499,11 @@ public class TestMRJobs { job.addCacheFile( new URI(first.toUri().toString() + "#distributed.first.symlink")); job.addFileToClassPath(second); - job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + // The AppMaster jar itself + job.addFileToClassPath( + APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); job.addArchiveToClassPath(third); job.addCacheArchive(fourth.toUri()); - job.createSymlink(); job.setMaxMapAttempts(1); // speed up failures job.submit(); @@ -473,6 +514,23 @@ public class TestMRJobs { " but didn't Match Job ID " + jobId , trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); } + + @Test + public void testDistributedCache() throws Exception { + // Test with a local (file:///) Job Jar + Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); + _testDistributedCache(localJobJarPath.toUri().toString()); + + // Test with a remote (hdfs://) Job Jar + Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/", + localJobJarPath.getName()); + remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath); + File localJobJarFile = new File(localJobJarPath.toUri().toString()); + if (localJobJarFile.exists()) { // just to make sure + localJobJarFile.delete(); + } + _testDistributedCache(remoteJobJarPath.toUri().toString()); + } private Path createTempFile(String filename, String contents) throws IOException { @@ -497,4 +555,45 @@ public class TestMRJobs { localFs.setPermission(p, new FsPermission("700")); return p; } + + private Path makeJobJarWithLib(String testDir) throws FileNotFoundException, + IOException{ + Path jobJarPath = new Path(testDir, "thejob.jar"); + FileOutputStream fos = + new FileOutputStream(new File(jobJarPath.toUri().getPath())); + JarOutputStream jos = new JarOutputStream(fos); + // Have to put in real jar files or it will complain + createAndAddJarToJar(jos, new File( + new Path(testDir, "lib1.jar").toUri().getPath())); + createAndAddJarToJar(jos, new File( + new Path(testDir, "lib2.jar").toUri().getPath())); + jos.close(); + localFs.setPermission(jobJarPath, new FsPermission("700")); + return jobJarPath; + } + + private void createAndAddJarToJar(JarOutputStream jos, File jarFile) + throws FileNotFoundException, IOException { + FileOutputStream fos2 = new FileOutputStream(jarFile); + JarOutputStream jos2 = new JarOutputStream(fos2); + // Have to have at least one entry or it will complain + ZipEntry ze = new ZipEntry("lib1.inside"); + jos2.putNextEntry(ze); + jos2.closeEntry(); + jos2.close(); + ze = new ZipEntry("lib/" + jarFile.getName()); + jos.putNextEntry(ze); + FileInputStream in = new FileInputStream(jarFile); + byte buf[] = new byte[1024]; + int numRead; + do { + numRead = in.read(buf); + if (numRead >= 0) { + jos.write(buf, 0, numRead); + } + } while (numRead != -1); + in.close(); + jos.closeEntry(); + jarFile.delete(); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java Fri Oct 19 02:25:55 2012 @@ -301,7 +301,6 @@ public class TestSpeculativeExecution { // Creates the Job Configuration job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. - job.createSymlink(); job.setMaxMapAttempts(2); job.submit(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Fri Oct 19 02:25:55 2012 @@ -177,8 +177,13 @@ public class TestYARNRunner extends Test @Test public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resourcemgr deleagte */ - ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); - ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol); + final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); + ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) { + @Override + public synchronized void start() { + this.rmClient = clientRMProtocol; + } + }; /* make sure kill calls finish application master */ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) .thenReturn(null); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java Fri Oct 19 02:25:55 2012 @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.TestSequ import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; import org.apache.hadoop.mapreduce.FailJob; +import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ProgramDriver; @@ -101,6 +102,8 @@ public class MapredTestDriver { "Job History Log analyzer."); pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, "HDFS Stress Test and Live Data Verification."); + pgd.addClass("minicluster", MiniHadoopClusterManager.class, + "Single process HDFS and MR cluster."); } catch(Throwable e) { e.printStackTrace(); } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Fri Oct 19 02:25:55 2012 @@ -55,7 +55,10 @@ import org.apache.hadoop.fs.LocalDirAllo import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -84,9 +87,7 @@ import org.jboss.netty.channel.ChannelHa import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.FileRegion; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.group.ChannelGroup; @@ -101,6 +102,7 @@ import org.jboss.netty.handler.codec.htt import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; @@ -110,10 +112,27 @@ public class ShuffleHandler extends Abst implements AuxServices.AuxiliaryService { private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); + + public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache"; + public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; + + public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; + public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); + private HttpPipelineFactory pipelineFact; + private int sslFileBufferSize; + + /** + * Should the shuffle use posix_fadvise calls to manage the OS cache during + * sendfile + */ + private boolean manageOsCache; + private int readaheadLength; + private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); + public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -126,6 +145,11 @@ public class ShuffleHandler extends Abst public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 8080; + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = + "mapreduce.shuffle.ssl.file.buffer.size"; + + public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -231,6 +255,12 @@ public class ShuffleHandler extends Abst @Override public synchronized void init(Configuration conf) { + manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, + DEFAULT_SHUFFLE_MANAGE_OS_CACHE); + + readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, + DEFAULT_SHUFFLE_READAHEAD_BYTES); + ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -249,7 +279,11 @@ public class ShuffleHandler extends Abst public synchronized void start() { Configuration conf = getConfig(); ServerBootstrap bootstrap = new ServerBootstrap(selector); - HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf); + try { + pipelineFact = new HttpPipelineFactory(conf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } bootstrap.setPipelineFactory(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); Channel ch = bootstrap.bind(new InetSocketAddress(port)); @@ -259,6 +293,9 @@ public class ShuffleHandler extends Abst pipelineFact.SHUFFLE.setPort(port); LOG.info(getName() + " listening on port " + port); super.start(); + + sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, + DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); } @Override @@ -266,6 +303,7 @@ public class ShuffleHandler extends Abst accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); ServerBootstrap bootstrap = new ServerBootstrap(selector); bootstrap.releaseExternalResources(); + pipelineFact.destroy(); super.stop(); } @@ -283,22 +321,38 @@ public class ShuffleHandler extends Abst class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; + private SSLFactory sslFactory; - public HttpPipelineFactory(Configuration conf) { + public HttpPipelineFactory(Configuration conf) throws Exception { SHUFFLE = new Shuffle(conf); + if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, + MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { + sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); + sslFactory.init(); + } + } + + public void destroy() { + if (sslFactory != null) { + sslFactory.destroy(); + } } @Override public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new HttpRequestDecoder(), - new HttpChunkAggregator(1 << 16), - new HttpResponseEncoder(), - new ChunkedWriteHandler(), - SHUFFLE); - // TODO factor security manager into pipeline - // TODO factor out encode/decode to permit binary shuffle - // TODO factor out decode of index to permit alt. models + ChannelPipeline pipeline = Channels.pipeline(); + if (sslFactory != null) { + pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); + } + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunking", new ChunkedWriteHandler()); + pipeline.addLast("shuffle", SHUFFLE); + return pipeline; + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models } } @@ -468,14 +522,14 @@ public class ShuffleHandler extends Abst base + "/file.out", conf); LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " + indexFileName); - IndexRecord info = + final IndexRecord info = indexCache.getIndexInformation(mapId, reduce, indexFileName, user); final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - File spillfile = new File(mapOutputFileName.toString()); + final File spillfile = new File(mapOutputFileName.toString()); RandomAccessFile spill; try { spill = new RandomAccessFile(spillfile, "r"); @@ -483,17 +537,28 @@ public class ShuffleHandler extends Abst LOG.info(spillfile + " not found"); return null; } - final FileRegion partition = new DefaultFileRegion( - spill.getChannel(), info.startOffset, info.partLength); - ChannelFuture writeFuture = ch.write(partition); - writeFuture.addListener(new ChannelFutureListener() { - // TODO error handling; distinguish IO/connection failures, - // attribute to appropriate spill output + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegion partition = new FadvisedFileRegion(spill, + info.startOffset, info.partLength, manageOsCache, readaheadLength, + readaheadPool, spillfile.getAbsolutePath()); + writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output @Override public void operationComplete(ChannelFuture future) { partition.releaseExternalResources(); } }); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + info.startOffset, info.partLength, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + spillfile.getAbsolutePath()); + writeFuture = ch.write(chunk); + } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(info.partLength); // optimistic return writeFuture; Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Fri Oct 19 02:25:55 2012 @@ -133,7 +133,31 @@ org.jboss.netty netty - + + commons-logging + commons-logging + provided + + + com.google.guava + guava + provided + + + commons-codec + commons-codec + provided + + + commons-cli + commons-cli + provided + + + commons-lang + commons-lang + provided + @@ -148,6 +172,18 @@ Max + + org.apache.maven.plugins + maven-surefire-plugin + + + + listener + org.apache.hadoop.test.TimedOutTestsListener + + + + Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml Fri Oct 19 02:25:55 2012 @@ -36,6 +36,14 @@ + commons-cli + commons-cli + + + commons-logging + commons-logging + + org.apache.hadoop hadoop-mapreduce-client-jobclient provided @@ -88,17 +96,12 @@ org.apache.hadoop hadoop-mapreduce-client-hs - provided - - - org.apache.hadoop - hadoop-mapreduce-client-hs test org.hsqldb hsqldb - 2.0.0 + provided Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java Fri Oct 19 02:25:55 2012 @@ -27,7 +27,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Iterator; import java.util.Random; import org.apache.commons.logging.Log; @@ -65,6 +64,16 @@ import org.hsqldb.server.Server; * * When called with no arguments the program starts a local HSQLDB server, and * uses this database for storing/retrieving the data. + *
+ * This program requires some additional configuration relating to HSQLDB. + * The the hsqldb jar should be added to the classpath: + *
+ * export HADOOP_CLASSPATH=share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar + *
+ * And the hsqldb jar should be included with the -libjars + * argument when executing it with hadoop: + *
+ * -libjars share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar */ public class DBCountPageView extends Configured implements Tool { @@ -72,6 +81,7 @@ public class DBCountPageView extends Con private Connection connection; private boolean initialized = false; + private boolean isOracle = false; private static final String[] AccessFieldNames = {"url", "referrer", "time"}; private static final String[] PageviewFieldNames = {"url", "pageview"}; @@ -92,7 +102,9 @@ public class DBCountPageView extends Con private void createConnection(String driverClassName , String url) throws Exception { - + if(driverClassName.toLowerCase().contains("oracle")) { + isOracle = true; + } Class.forName(driverClassName); connection = DriverManager.getConnection(url); connection.setAutoCommit(false); @@ -132,7 +144,7 @@ public class DBCountPageView extends Con } private void dropTables() { - String dropAccess = "DROP TABLE Access"; + String dropAccess = "DROP TABLE HAccess"; String dropPageview = "DROP TABLE Pageview"; Statement st = null; try { @@ -147,18 +159,21 @@ public class DBCountPageView extends Con } private void createTables() throws SQLException { - + String dataType = "BIGINT NOT NULL"; + if(isOracle) { + dataType = "NUMBER(19) NOT NULL"; + } String createAccess = "CREATE TABLE " + - "Access(url VARCHAR(100) NOT NULL," + + "HAccess(url VARCHAR(100) NOT NULL," + " referrer VARCHAR(100)," + - " time BIGINT NOT NULL, " + + " time " + dataType + ", " + " PRIMARY KEY (url, time))"; String createPageview = "CREATE TABLE " + "Pageview(url VARCHAR(100) NOT NULL," + - " pageview BIGINT NOT NULL, " + + " pageview " + dataType + ", " + " PRIMARY KEY (url))"; Statement st = connection.createStatement(); @@ -179,7 +194,7 @@ public class DBCountPageView extends Con PreparedStatement statement = null ; try { statement = connection.prepareStatement( - "INSERT INTO Access(url, referrer, time)" + + "INSERT INTO HAccess(url, referrer, time)" + " VALUES (?, ?, ?)"); Random random = new Random(); @@ -238,7 +253,7 @@ public class DBCountPageView extends Con /**Verifies the results are correct */ private boolean verify() throws SQLException { //check total num pageview - String countAccessQuery = "SELECT COUNT(*) FROM Access"; + String countAccessQuery = "SELECT COUNT(*) FROM HAccess"; String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview"; Statement st = null; ResultSet rs = null; @@ -386,7 +401,7 @@ public class DBCountPageView extends Con DBConfiguration.configureDB(conf, driverClassName, url); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJobName("Count Pageviews of URLs"); job.setJarByClass(DBCountPageView.class); @@ -394,7 +409,7 @@ public class DBCountPageView extends Con job.setCombinerClass(LongSumReducer.class); job.setReducerClass(PageviewReducer.class); - DBInputFormat.setInput(job, AccessRecord.class, "Access" + DBInputFormat.setInput(job, AccessRecord.class, "HAccess" , null, "url", AccessFieldNames); DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java Fri Oct 19 02:25:55 2012 @@ -211,7 +211,7 @@ public class SecondarySort { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { - System.err.println("Usage: secondarysrot "); + System.err.println("Usage: secondarysort "); System.exit(2); } Job job = new Job(conf, "secondary sort"); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java Fri Oct 19 02:25:55 2012 @@ -167,7 +167,6 @@ public class Sort extends Configure URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning"); DistributedCache.addCacheFile(partitionUri, conf); - DistributedCache.createSymlink(conf); } System.out.println("Running on " + Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java Fri Oct 19 02:25:55 2012 @@ -1,196 +1,196 @@ -package org.apache.hadoop.examples; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.StringTokenizer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class WordMean extends Configured implements Tool { - - private double mean = 0; - - private final static Text COUNT = new Text("count"); - private final static Text LENGTH = new Text("length"); - private final static LongWritable ONE = new LongWritable(1); - - /** - * Maps words from line of text into 2 key-value pairs; one key-value pair for - * counting the word, another for counting its length. - */ - public static class WordMeanMapper extends - Mapper { - - private LongWritable wordLen = new LongWritable(); - - /** - * Emits 2 key-value pairs for counting the word and its length. Outputs are - * (Text, LongWritable). - * - * @param value - * This will be a line of text coming in from our input file. - */ - public void map(Object key, Text value, Context context) - throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - String string = itr.nextToken(); - this.wordLen.set(string.length()); - context.write(LENGTH, this.wordLen); - context.write(COUNT, ONE); - } - } - } - - /** - * Performs integer summation of all the values for each key. - */ - public static class WordMeanReducer extends - Reducer { - - private LongWritable sum = new LongWritable(); - - /** - * Sums all the individual values within the iterator and writes them to the - * same key. - * - * @param key - * This will be one of 2 constants: LENGTH_STR or COUNT_STR. - * @param values - * This will be an iterator of all the values associated with that - * key. - */ - public void reduce(Text key, Iterable values, Context context) - throws IOException, InterruptedException { - - int theSum = 0; - for (LongWritable val : values) { - theSum += val.get(); - } - sum.set(theSum); - context.write(key, sum); - } - } - - /** - * Reads the output file and parses the summation of lengths, and the word - * count, to perform a quick calculation of the mean. - * - * @param path - * The path to find the output file in. Set in main to the output - * directory. - * @throws IOException - * If it cannot access the output directory, we throw an exception. - */ - private double readAndCalcMean(Path path, Configuration conf) - throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(path, "part-r-00000"); - - if (!fs.exists(file)) - throw new IOException("Output not found!"); - - BufferedReader br = null; - - // average = total sum / number of elements; - try { - br = new BufferedReader(new InputStreamReader(fs.open(file))); - - long count = 0; - long length = 0; - - String line; - while ((line = br.readLine()) != null) { - StringTokenizer st = new StringTokenizer(line); - - // grab type - String type = st.nextToken(); - - // differentiate - if (type.equals(COUNT.toString())) { - String countLit = st.nextToken(); - count = Long.parseLong(countLit); - } else if (type.equals(LENGTH.toString())) { - String lengthLit = st.nextToken(); - length = Long.parseLong(lengthLit); - } - } - - double theMean = (((double) length) / ((double) count)); - System.out.println("The mean is: " + theMean); - return theMean; - } finally { - br.close(); - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordMean(), args); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: wordmean "); - return 0; - } - - Configuration conf = getConf(); - - @SuppressWarnings("deprecation") - Job job = new Job(conf, "word mean"); - job.setJarByClass(WordMean.class); - job.setMapperClass(WordMeanMapper.class); - job.setCombinerClass(WordMeanReducer.class); - job.setReducerClass(WordMeanReducer.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - Path outputpath = new Path(args[1]); - FileOutputFormat.setOutputPath(job, outputpath); - boolean result = job.waitForCompletion(true); - mean = readAndCalcMean(outputpath, conf); - - return (result ? 0 : 1); - } - - /** - * Only valuable after run() called. - * - * @return Returns the mean value. - */ - public double getMean() { - return mean; - } +package org.apache.hadoop.examples; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class WordMean extends Configured implements Tool { + + private double mean = 0; + + private final static Text COUNT = new Text("count"); + private final static Text LENGTH = new Text("length"); + private final static LongWritable ONE = new LongWritable(1); + + /** + * Maps words from line of text into 2 key-value pairs; one key-value pair for + * counting the word, another for counting its length. + */ + public static class WordMeanMapper extends + Mapper { + + private LongWritable wordLen = new LongWritable(); + + /** + * Emits 2 key-value pairs for counting the word and its length. Outputs are + * (Text, LongWritable). + * + * @param value + * This will be a line of text coming in from our input file. + */ + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + String string = itr.nextToken(); + this.wordLen.set(string.length()); + context.write(LENGTH, this.wordLen); + context.write(COUNT, ONE); + } + } + } + + /** + * Performs integer summation of all the values for each key. + */ + public static class WordMeanReducer extends + Reducer { + + private LongWritable sum = new LongWritable(); + + /** + * Sums all the individual values within the iterator and writes them to the + * same key. + * + * @param key + * This will be one of 2 constants: LENGTH_STR or COUNT_STR. + * @param values + * This will be an iterator of all the values associated with that + * key. + */ + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + + int theSum = 0; + for (LongWritable val : values) { + theSum += val.get(); + } + sum.set(theSum); + context.write(key, sum); + } + } + + /** + * Reads the output file and parses the summation of lengths, and the word + * count, to perform a quick calculation of the mean. + * + * @param path + * The path to find the output file in. Set in main to the output + * directory. + * @throws IOException + * If it cannot access the output directory, we throw an exception. + */ + private double readAndCalcMean(Path path, Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path file = new Path(path, "part-r-00000"); + + if (!fs.exists(file)) + throw new IOException("Output not found!"); + + BufferedReader br = null; + + // average = total sum / number of elements; + try { + br = new BufferedReader(new InputStreamReader(fs.open(file))); + + long count = 0; + long length = 0; + + String line; + while ((line = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(line); + + // grab type + String type = st.nextToken(); + + // differentiate + if (type.equals(COUNT.toString())) { + String countLit = st.nextToken(); + count = Long.parseLong(countLit); + } else if (type.equals(LENGTH.toString())) { + String lengthLit = st.nextToken(); + length = Long.parseLong(lengthLit); + } + } + + double theMean = (((double) length) / ((double) count)); + System.out.println("The mean is: " + theMean); + return theMean; + } finally { + br.close(); + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new WordMean(), args); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: wordmean "); + return 0; + } + + Configuration conf = getConf(); + + @SuppressWarnings("deprecation") + Job job = new Job(conf, "word mean"); + job.setJarByClass(WordMean.class); + job.setMapperClass(WordMeanMapper.class); + job.setCombinerClass(WordMeanReducer.class); + job.setReducerClass(WordMeanReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + Path outputpath = new Path(args[1]); + FileOutputFormat.setOutputPath(job, outputpath); + boolean result = job.waitForCompletion(true); + mean = readAndCalcMean(outputpath, conf); + + return (result ? 0 : 1); + } + + /** + * Only valuable after run() called. + * + * @return Returns the mean value. + */ + public double getMean() { + return mean; + } } \ No newline at end of file Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java Fri Oct 19 02:25:55 2012 @@ -1,208 +1,208 @@ -package org.apache.hadoop.examples; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.StringTokenizer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class WordMedian extends Configured implements Tool { - - private double median = 0; - private final static IntWritable ONE = new IntWritable(1); - - /** - * Maps words from line of text into a key-value pair; the length of the word - * as the key, and 1 as the value. - */ - public static class WordMedianMapper extends - Mapper { - - private IntWritable length = new IntWritable(); - - /** - * Emits a key-value pair for counting the word. Outputs are (IntWritable, - * IntWritable). - * - * @param value - * This will be a line of text coming in from our input file. - */ - public void map(Object key, Text value, Context context) - throws IOException, InterruptedException { - StringTokenizer itr = new StringTokenizer(value.toString()); - while (itr.hasMoreTokens()) { - String string = itr.nextToken(); - length.set(string.length()); - context.write(length, ONE); - } - } - } - - /** - * Performs integer summation of all the values for each key. - */ - public static class WordMedianReducer extends - Reducer { - - private IntWritable val = new IntWritable(); - - /** - * Sums all the individual values within the iterator and writes them to the - * same key. - * - * @param key - * This will be a length of a word that was read. - * @param values - * This will be an iterator of all the values associated with that - * key. - */ - public void reduce(IntWritable key, Iterable values, - Context context) throws IOException, InterruptedException { - - int sum = 0; - for (IntWritable value : values) { - sum += value.get(); - } - val.set(sum); - context.write(key, val); - } - } - - /** - * This is a standard program to read and find a median value based on a file - * of word counts such as: 1 456, 2 132, 3 56... Where the first values are - * the word lengths and the following values are the number of times that - * words of that length appear. - * - * @param path - * The path to read the HDFS file from (part-r-00000...00001...etc). - * @param medianIndex1 - * The first length value to look for. - * @param medianIndex2 - * The second length value to look for (will be the same as the first - * if there are an even number of words total). - * @throws IOException - * If file cannot be found, we throw an exception. - * */ - private double readAndFindMedian(String path, int medianIndex1, - int medianIndex2, Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path file = new Path(path, "part-r-00000"); - - if (!fs.exists(file)) - throw new IOException("Output not found!"); - - BufferedReader br = null; - - try { - br = new BufferedReader(new InputStreamReader(fs.open(file))); - int num = 0; - - String line; - while ((line = br.readLine()) != null) { - StringTokenizer st = new StringTokenizer(line); - - // grab length - String currLen = st.nextToken(); - - // grab count - String lengthFreq = st.nextToken(); - - int prevNum = num; - num += Integer.parseInt(lengthFreq); - - if (medianIndex2 >= prevNum && medianIndex1 <= num) { - System.out.println("The median is: " + currLen); - br.close(); - return Double.parseDouble(currLen); - } else if (medianIndex2 >= prevNum && medianIndex1 < num) { - String nextCurrLen = st.nextToken(); - double theMedian = (Integer.parseInt(currLen) + Integer - .parseInt(nextCurrLen)) / 2.0; - System.out.println("The median is: " + theMedian); - br.close(); - return theMedian; - } - } - } finally { - br.close(); - } - // error, no median found - return -1; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new WordMedian(), args); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: wordmedian "); - return 0; - } - - setConf(new Configuration()); - Configuration conf = getConf(); - - @SuppressWarnings("deprecation") - Job job = new Job(conf, "word median"); - job.setJarByClass(WordMedian.class); - job.setMapperClass(WordMedianMapper.class); - job.setCombinerClass(WordMedianReducer.class); - job.setReducerClass(WordMedianReducer.class); - job.setOutputKeyClass(IntWritable.class); - job.setOutputValueClass(IntWritable.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - boolean result = job.waitForCompletion(true); - - // Wait for JOB 1 -- get middle value to check for Median - - long totalWords = job.getCounters() - .getGroup(TaskCounter.class.getCanonicalName()) - .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue(); - int medianIndex1 = (int) Math.ceil((totalWords / 2.0)); - int medianIndex2 = (int) Math.floor((totalWords / 2.0)); - - median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf); - - return (result ? 0 : 1); - } - - public double getMedian() { - return median; - } -} +package org.apache.hadoop.examples; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class WordMedian extends Configured implements Tool { + + private double median = 0; + private final static IntWritable ONE = new IntWritable(1); + + /** + * Maps words from line of text into a key-value pair; the length of the word + * as the key, and 1 as the value. + */ + public static class WordMedianMapper extends + Mapper { + + private IntWritable length = new IntWritable(); + + /** + * Emits a key-value pair for counting the word. Outputs are (IntWritable, + * IntWritable). + * + * @param value + * This will be a line of text coming in from our input file. + */ + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + String string = itr.nextToken(); + length.set(string.length()); + context.write(length, ONE); + } + } + } + + /** + * Performs integer summation of all the values for each key. + */ + public static class WordMedianReducer extends + Reducer { + + private IntWritable val = new IntWritable(); + + /** + * Sums all the individual values within the iterator and writes them to the + * same key. + * + * @param key + * This will be a length of a word that was read. + * @param values + * This will be an iterator of all the values associated with that + * key. + */ + public void reduce(IntWritable key, Iterable values, + Context context) throws IOException, InterruptedException { + + int sum = 0; + for (IntWritable value : values) { + sum += value.get(); + } + val.set(sum); + context.write(key, val); + } + } + + /** + * This is a standard program to read and find a median value based on a file + * of word counts such as: 1 456, 2 132, 3 56... Where the first values are + * the word lengths and the following values are the number of times that + * words of that length appear. + * + * @param path + * The path to read the HDFS file from (part-r-00000...00001...etc). + * @param medianIndex1 + * The first length value to look for. + * @param medianIndex2 + * The second length value to look for (will be the same as the first + * if there are an even number of words total). + * @throws IOException + * If file cannot be found, we throw an exception. + * */ + private double readAndFindMedian(String path, int medianIndex1, + int medianIndex2, Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + Path file = new Path(path, "part-r-00000"); + + if (!fs.exists(file)) + throw new IOException("Output not found!"); + + BufferedReader br = null; + + try { + br = new BufferedReader(new InputStreamReader(fs.open(file))); + int num = 0; + + String line; + while ((line = br.readLine()) != null) { + StringTokenizer st = new StringTokenizer(line); + + // grab length + String currLen = st.nextToken(); + + // grab count + String lengthFreq = st.nextToken(); + + int prevNum = num; + num += Integer.parseInt(lengthFreq); + + if (medianIndex2 >= prevNum && medianIndex1 <= num) { + System.out.println("The median is: " + currLen); + br.close(); + return Double.parseDouble(currLen); + } else if (medianIndex2 >= prevNum && medianIndex1 < num) { + String nextCurrLen = st.nextToken(); + double theMedian = (Integer.parseInt(currLen) + Integer + .parseInt(nextCurrLen)) / 2.0; + System.out.println("The median is: " + theMedian); + br.close(); + return theMedian; + } + } + } finally { + br.close(); + } + // error, no median found + return -1; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new WordMedian(), args); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: wordmedian "); + return 0; + } + + setConf(new Configuration()); + Configuration conf = getConf(); + + @SuppressWarnings("deprecation") + Job job = new Job(conf, "word median"); + job.setJarByClass(WordMedian.class); + job.setMapperClass(WordMedianMapper.class); + job.setCombinerClass(WordMedianReducer.class); + job.setReducerClass(WordMedianReducer.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + boolean result = job.waitForCompletion(true); + + // Wait for JOB 1 -- get middle value to check for Median + + long totalWords = job.getCounters() + .getGroup(TaskCounter.class.getCanonicalName()) + .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue(); + int medianIndex1 = (int) Math.ceil((totalWords / 2.0)); + int medianIndex2 = (int) Math.floor((totalWords / 2.0)); + + median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf); + + return (result ? 0 : 1); + } + + public double getMedian() { + return median; + } +}