hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082631 - in /hadoop/mapreduce/branches/yahoo-merge/src: java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/task/reduce/ test/ test/mapred/org/apache/hadoop/mapred/
Date Thu, 17 Mar 2011 18:43:03 GMT
Author: acmurthy
Date: Thu Mar 17 18:43:03 2011
New Revision: 1082631

URL: http://svn.apache.org/viewvc?rev=1082631&view=rev
Log:
Fix collissions for shuffle input

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred-site.xml
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
Thu Mar 17 18:43:03 2011
@@ -225,7 +225,7 @@ public class LocalJobRunner implements C
               getShortUserName());
           TaskRunner.setupChildMapredLocalDirs(map, localConf);
 
-          MapOutputFile mapOutput = new MapOutputFile();
+          MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
           mapOutputFiles.put(mapId, mapOutput);
 
@@ -389,7 +389,7 @@ public class LocalJobRunner implements C
               if (!this.isInterrupted()) {
                 TaskAttemptID mapId = mapIds.get(i);
                 Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
-                MapOutputFile localOutputFile = new MapOutputFile();
+                MapOutputFile localOutputFile = new MROutputFiles();
                 localOutputFile.setConf(localConf);
                 Path reduceIn =
                   localOutputFile.getInputFileForWrite(mapId.getTaskID(),

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapOutputFile.java
Thu Mar 17 18:43:03 2011
@@ -39,9 +39,9 @@ import org.apache.hadoop.mapreduce.MRCon
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MapOutputFile implements Configurable {
+public abstract class MapOutputFile implements Configurable {
 
-  private JobConf conf;
+  private Configuration conf;
 
   static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
   static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
@@ -50,177 +50,121 @@ public class MapOutputFile implements Co
   public MapOutputFile() {
   }
 
-  private LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator(MRConfig.LOCAL_DIR);
-  
   /**
    * Return the path to local map output file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, conf);
-  }
+  public abstract Path getOutputFile() throws IOException;
 
   /**
    * Create a local map output file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING, size, conf);
-  }
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
 
   /**
    * Create a local map output file name on the same volume.
    */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(), MAP_OUTPUT_FILENAME_STRING);
-  }
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
 
   /**
    * Return the path to a local map output index file created earlier
-   * 
+   *
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, conf);
-  }
+  public abstract Path getOutputIndexFile() throws IOException;
 
   /**
    * Create a local map output index file name.
-   * 
+   *
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getOutputIndexFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
-        + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
-        size, conf);
-  }
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
 
   /**
    * Create a local map output index file name on the same volume.
    */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(),
-        MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING);
-  }
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
 
   /**
    * Return a local map spill file created earlier.
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", conf);
-  }
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out", size, conf);
-  }
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local map spill index file created earlier
-   * 
+   *
    * @param spillNumber the number
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", conf);
-  }
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
 
   /**
    * Create a local map spill index file name.
-   * 
+   *
    * @param spillNumber the number
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
-        + spillNumber + ".out.index", size, conf);
-  }
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
 
   /**
    * Return a local reduce input file created earlier
-   * 
+   *
    * @param mapId a map task id
    * @return path
-   * @throws IOException 
+   * @throws IOException
    */
-  public Path getInputFile(int mapId)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
-            .valueOf(mapId)), conf);
-  }
+  public abstract Path getInputFile(int mapId) throws IOException;
 
   /**
    * Create a local reduce input file name.
-   * 
+   *
    * @param mapId a map task id
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, 
-                                   long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
-        size, conf);
-  }
+  public abstract Path getInputFileForWrite(
+      org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException;
 
   /** Removes all of the files related to a task. */
-  public void removeAll()
-      throws IOException {
-    conf.deleteLocalFiles(TaskTracker.OUTPUT);
-  }
+  public abstract void removeAll() throws IOException;
 
   @Override
   public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
+    this.conf = conf;
   }
 
   @Override
   public Configuration getConf() {
     return conf;
   }
-  
+
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
Thu Mar 17 18:43:03 2011
@@ -407,7 +407,8 @@ public class ReduceTask extends Task {
                     shuffledMapsCounter,
                     reduceShuffleBytes, failedShuffleCounter,
                     mergedMapOutputsCounter,
-                    taskStatus, copyPhase, sortPhase, this);
+                    taskStatus, copyPhase, sortPhase, this,
+                    mapOutputFile);
       rIter = shuffle.run();
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Thu
Mar 17 18:43:03 2011
@@ -1187,7 +1187,7 @@ abstract public class Task implements Wr
     }
     this.mapOutputFile = ReflectionUtils.newInstance(
         conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
-          MapOutputFile.class, MapOutputFile.class), conf);
+          MROutputFiles.class, MapOutputFile.class), conf);
     this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
     // add the static resolutions (this is required for the junit to
     // work on testcases that simulate multiple nodes on a single physical

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
Thu Mar 17 18:43:03 2011
@@ -133,7 +133,7 @@ public class MergeManager<K, V> {
                       Counters.Counter reduceCombineInputCounter,
                       Counters.Counter mergedMapOutputsCounter,
                       ExceptionReporter exceptionReporter,
-                      Progress mergePhase) {
+                      Progress mergePhase, MapOutputFile mapOutputFile) {
     this.reduceId = reduceId;
     this.jobConf = jobConf;
     this.localDirAllocator = localDirAllocator;
@@ -146,7 +146,7 @@ public class MergeManager<K, V> {
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new MapOutputFile();
+    this.mapOutputFile = mapOutputFile;
     this.mapOutputFile.setConf(jobConf);
     
     this.localFS = localFS;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
Thu Mar 17 18:43:03 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
@@ -75,7 +76,8 @@ public class Shuffle<K, V> implements Ex
                  TaskStatus status,
                  Progress copyPhase,
                  Progress mergePhase,
-                 Task reduceTask) {
+                 Task reduceTask,
+                 MapOutputFile mapOutputFile) {
     this.reduceId = reduceId;
     this.jobConf = jobConf;
     this.umbilical = umbilical;
@@ -95,7 +97,7 @@ public class Shuffle<K, V> implements Ex
                                     spilledRecordsCounter, 
                                     reduceCombineInputCounter, 
                                     mergedMapOutputsCounter, 
-                                    this, mergePhase);
+                                    this, mergePhase, mapOutputFile);
   }
 
   @SuppressWarnings("unchecked")

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred-site.xml?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred-site.xml Thu Mar 17 18:43:03 2011
@@ -48,4 +48,8 @@
   <name>mapreduce.jobtracker.persist.jobstatus.active</name>
   <value>false</value>
 </property>
+<property>
+  <name>mapreduce.task.local.output.class</name>
+  <value>org.apache.hadoop.mapred.MROutputFiles</value>
+</property>
 </configuration>

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=1082631&r1=1082630&r2=1082631&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
Thu Mar 17 18:43:03 2011
@@ -306,7 +306,7 @@ public class TestMapRed extends Configur
                        ) throws IOException {
       if (first) {
         first = false;
-        MapOutputFile mapOutputFile = new MapOutputFile();
+        MapOutputFile mapOutputFile = new MROutputFiles();
         mapOutputFile.setConf(conf);
         Path input = mapOutputFile.getInputFile(0);
         FileSystem fs = FileSystem.get(conf);



Mime
View raw message