hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1436936 [2/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-c...
Date Tue, 22 Jan 2013 14:10:43 GMT
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1436936&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan 22 14:10:42 2013
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.mapred.Task.CombineValuesIterator;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
+@InterfaceAudience.LimitedPrivate({"MapReduce"})
+@InterfaceStability.Unstable
+public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
+  
+  private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class);
+  
+  /* Maximum percentage of the in-memory limit that a single shuffle can 
+   * consume*/ 
+  private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
+    = 0.25f;
+
+  private final TaskAttemptID reduceId;
+  
+  private final JobConf jobConf;
+  private final FileSystem localFS;
+  private final FileSystem rfs;
+  private final LocalDirAllocator localDirAllocator;
+  
+  protected MapOutputFile mapOutputFile;
+  
+  Set<InMemoryMapOutput<K, V>> inMemoryMergedMapOutputs = 
+    new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
+  private IntermediateMemoryToMemoryMerger memToMemMerger;
+
+  Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs = 
+    new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
+  private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
+  
+  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  private final OnDiskMerger onDiskMerger;
+  
+  private final long memoryLimit;
+  private long usedMemory;
+  private long commitMemory;
+  private final long maxSingleShuffleLimit;
+  
+  private final int memToMemMergeOutputsThreshold; 
+  private final long mergeThreshold;
+  
+  private final int ioSortFactor;
+
+  private final Reporter reporter;
+  private final ExceptionReporter exceptionReporter;
+  
+  /**
+   * Combiner class to run during in-memory merge, if defined.
+   */
+  private final Class<? extends Reducer> combinerClass;
+
+  /**
+   * Resettable collector used for combine.
+   */
+  private final CombineOutputCollector<K,V> combineCollector;
+
+  private final Counters.Counter spilledRecordsCounter;
+
+  private final Counters.Counter reduceCombineInputCounter;
+
+  private final Counters.Counter mergedMapOutputsCounter;
+  
+  private final CompressionCodec codec;
+  
+  private final Progress mergePhase;
+
+  public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, 
+                      FileSystem localFS,
+                      LocalDirAllocator localDirAllocator,  
+                      Reporter reporter,
+                      CompressionCodec codec,
+                      Class<? extends Reducer> combinerClass,
+                      CombineOutputCollector<K,V> combineCollector,
+                      Counters.Counter spilledRecordsCounter,
+                      Counters.Counter reduceCombineInputCounter,
+                      Counters.Counter mergedMapOutputsCounter,
+                      ExceptionReporter exceptionReporter,
+                      Progress mergePhase, MapOutputFile mapOutputFile) {
+    this.reduceId = reduceId;
+    this.jobConf = jobConf;
+    this.localDirAllocator = localDirAllocator;
+    this.exceptionReporter = exceptionReporter;
+    
+    this.reporter = reporter;
+    this.codec = codec;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+    this.mapOutputFile = mapOutputFile;
+    this.mapOutputFile.setConf(jobConf);
+    
+    this.localFS = localFS;
+    this.rfs = ((LocalFileSystem)localFS).getRaw();
+    
+    final float maxInMemCopyUse =
+      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for " +
+          MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
+          maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = 
+      (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
+        * maxInMemCopyUse);
+ 
+    this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
+
+    final float singleShuffleMemoryLimitPercent =
+        jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
+            DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    usedMemory = 0L;
+    commitMemory = 0L;
+    this.maxSingleShuffleLimit = 
+      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
+    this.memToMemMergeOutputsThreshold = 
+            jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
+    this.mergeThreshold = (long)(this.memoryLimit * 
+                          jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 
+                                           0.90f));
+    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
+             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
+             "mergeThreshold=" + mergeThreshold + ", " + 
+             "ioSortFactor=" + ioSortFactor + ", " +
+             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
+
+    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
+      throw new RuntimeException("Invlaid configuration: "
+          + "maxSingleShuffleLimit should be less than mergeThreshold"
+          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+          + "mergeThreshold: " + this.mergeThreshold);
+    }
+
+    boolean allowMemToMemMerge = 
+      jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
+    if (allowMemToMemMerge) {
+      this.memToMemMerger = 
+        new IntermediateMemoryToMemoryMerger(this,
+                                             memToMemMergeOutputsThreshold);
+      this.memToMemMerger.start();
+    } else {
+      this.memToMemMerger = null;
+    }
+    
+    this.inMemoryMerger = createInMemoryMerger();
+    this.inMemoryMerger.start();
+    
+    this.onDiskMerger = new OnDiskMerger(this);
+    this.onDiskMerger.start();
+    
+    this.mergePhase = mergePhase;
+  }
+  
+  protected MergeThread<InMemoryMapOutput<K,V>, K,V> createInMemoryMerger() {
+    return new InMemoryMerger(this);
+  }
+
+  TaskAttemptID getReduceId() {
+    return reduceId;
+  }
+
+  @VisibleForTesting
+  ExceptionReporter getExceptionReporter() {
+    return exceptionReporter;
+  }
+
+  @Override
+  public void waitForResource() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+  }
+  
+  private boolean canShuffleToMemory(long requestedSize) {
+    return (requestedSize < maxSingleShuffleLimit); 
+  }
+  
+  @Override
+  public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
+                                             long requestedSize,
+                                             int fetcher
+                                             ) throws IOException {
+    if (!canShuffleToMemory(requestedSize)) {
+      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
+               " is greater than maxSingleShuffleLimit (" + 
+               maxSingleShuffleLimit + ")");
+      return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
+                                      jobConf, mapOutputFile, fetcher, true);
+    }
+    
+    // Stall shuffle if we are above the memory limit
+
+    // It is possible that all threads could just be stalling and not make
+    // progress at all. This could happen when:
+    //
+    // requested size is causing the used memory to go above limit &&
+    // requested size < singleShuffleLimit &&
+    // current used size < mergeThreshold (merge will not get triggered)
+    //
+    // To avoid this from happening, we allow exactly one thread to go past
+    // the memory limit. We check (usedMemory > memoryLimit) and not
+    // (usedMemory + requestedSize > memoryLimit). When this thread is done
+    // fetching, this will automatically trigger a merge thereby unlocking
+    // all the stalled threads
+    
+    if (usedMemory > memoryLimit) {
+      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
+          " CommitMemory is (" + commitMemory + ")"); 
+      return null;
+    }
+    
+    // Allow the in-memory shuffle to progress
+    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+        + "CommitMemory is (" + commitMemory + ")"); 
+    return unconditionalReserve(mapId, requestedSize, true);
+  }
+  
+  /**
+   * Unconditional Reserve is used by the Memory-to-Memory thread
+   * @return
+   */
+  private synchronized InMemoryMapOutput<K, V> unconditionalReserve(
+      TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+    usedMemory += requestedSize;
+    return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,
+                                      codec, primaryMapOutput);
+  }
+  
+  synchronized void unreserve(long size) {
+    usedMemory -= size;
+  }
+
+  public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) { 
+    inMemoryMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+    commitMemory+= mapOutput.getSize();
+
+    // Can hang if mergeThreshold is really low.
+    if (commitMemory >= mergeThreshold) {
+      LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+          commitMemory + " > mergeThreshold=" + mergeThreshold + 
+          ". Current usedMemory=" + usedMemory);
+      inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+      inMemoryMergedMapOutputs.clear();
+      inMemoryMerger.startMerge(inMemoryMapOutputs);
+      commitMemory = 0L;  // Reset commitMemory.
+    }
+    
+    if (memToMemMerger != null) {
+      if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { 
+        memToMemMerger.startMerge(inMemoryMapOutputs);
+      }
+    }
+  }
+  
+  
+  public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutput) {
+    inMemoryMergedMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
+             ", inMemoryMergedMapOutputs.size() -> " + 
+             inMemoryMergedMapOutputs.size());
+  }
+  
+  public synchronized void closeOnDiskFile(Path file) {
+    onDiskMapOutputs.add(file);
+    
+    if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+      onDiskMerger.startMerge(onDiskMapOutputs);
+    }
+  }
+  
+  @Override
+  public RawKeyValueIterator close() throws Throwable {
+    // Wait for on-going merges to complete
+    if (memToMemMerger != null) { 
+      memToMemMerger.close();
+    }
+    inMemoryMerger.close();
+    onDiskMerger.close();
+    
+    List<InMemoryMapOutput<K, V>> memory = 
+      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
+    memory.addAll(inMemoryMapOutputs);
+    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+    return finalMerge(jobConf, rfs, memory, disk);
+  }
+   
+  private class IntermediateMemoryToMemoryMerger 
+  extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
+    
+    public IntermediateMemoryToMemoryMerger(MergeManagerImpl<K, V> manager, 
+                                            int mergeFactor) {
+      super(manager, mergeFactor, exceptionReporter);
+      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
+      		    "shuffled map-outputs");
+      setDaemon(true);
+    }
+
+    @Override
+    public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+
+      TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
+      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments, 0);
+      int noInMemorySegments = inMemorySegments.size();
+      
+      InMemoryMapOutput<K, V> mergedMapOutputs = 
+        unconditionalReserve(dummyMapId, mergeOutputSize, false);
+      
+      Writer<K, V> writer = 
+        new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
+      
+      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
+               " segments of total-size: " + mergeOutputSize);
+
+      RawKeyValueIterator rIter = 
+        Merger.merge(jobConf, rfs,
+                     (Class<K>)jobConf.getMapOutputKeyClass(),
+                     (Class<V>)jobConf.getMapOutputValueClass(),
+                     inMemorySegments, inMemorySegments.size(),
+                     new Path(reduceId.toString()),
+                     (RawComparator<K>)jobConf.getOutputKeyComparator(),
+                     reporter, null, null, null);
+      Merger.writeFile(rIter, writer, reporter, jobConf);
+      writer.close();
+
+      LOG.info(reduceId +  
+               " Memory-to-Memory merge of the " + noInMemorySegments +
+               " files in-memory complete.");
+
+      // Note the output of the merge
+      closeInMemoryMergedFile(mergedMapOutputs);
+    }
+  }
+  
+  private class InMemoryMerger extends MergeThread<InMemoryMapOutput<K,V>, K,V> {
+    
+    public InMemoryMerger(MergeManagerImpl<K, V> manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName
+      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
+      if (inputs == null || inputs.size() == 0) {
+        return;
+      }
+      
+      //name this output file same as the name of the first file that is 
+      //there in the current list of inmem files (this is guaranteed to
+      //be absent on the disk currently. So we don't overwrite a prev. 
+      //created spill). Also we need to create the output file now since
+      //it is not guaranteed that this file will be present after merge
+      //is called (we delete empty files as soon as we see them
+      //in the merge method)
+
+      //figure out the mapId 
+      TaskAttemptID mapId = inputs.get(0).getMapId();
+      TaskID mapTaskId = mapId.getTaskID();
+
+      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
+      long mergeOutputSize = 
+        createInMemorySegments(inputs, inMemorySegments,0);
+      int noInMemorySegments = inMemorySegments.size();
+
+      Path outputPath = 
+        mapOutputFile.getInputFileForWrite(mapTaskId,
+                                           mergeOutputSize).suffix(
+                                               Task.MERGED_OUTPUT_PREFIX);
+
+      Writer<K,V> writer = 
+        new Writer<K,V>(jobConf, rfs, outputPath,
+                        (Class<K>) jobConf.getMapOutputKeyClass(),
+                        (Class<V>) jobConf.getMapOutputValueClass(),
+                        codec, null);
+
+      RawKeyValueIterator rIter = null;
+      try {
+        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
+                 " segments...");
+        
+        rIter = Merger.merge(jobConf, rfs,
+                             (Class<K>)jobConf.getMapOutputKeyClass(),
+                             (Class<V>)jobConf.getMapOutputValueClass(),
+                             inMemorySegments, inMemorySegments.size(),
+                             new Path(reduceId.toString()),
+                             (RawComparator<K>)jobConf.getOutputKeyComparator(),
+                             reporter, spilledRecordsCounter, null, null);
+        
+        if (null == combinerClass) {
+          Merger.writeFile(rIter, writer, reporter, jobConf);
+        } else {
+          combineCollector.setWriter(writer);
+          combineAndSpill(rIter, reduceCombineInputCounter);
+        }
+        writer.close();
+
+        LOG.info(reduceId +  
+            " Merge of the " + noInMemorySegments +
+            " files in-memory complete." +
+            " Local file is " + outputPath + " of size " + 
+            localFS.getFileStatus(outputPath).getLen());
+      } catch (IOException e) { 
+        //make sure that we delete the ondisk file that we created 
+        //earlier when we invoked cloneFileAttributes
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      // Note the output of the merge
+      closeOnDiskFile(outputPath);
+    }
+
+  }
+  
+  private class OnDiskMerger extends MergeThread<Path,K,V> {
+    
+    public OnDiskMerger(MergeManagerImpl<K, V> manager) {
+      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
+      setDaemon(true);
+    }
+    
+    @Override
+    public void merge(List<Path> inputs) throws IOException {
+      // sanity check
+      if (inputs == null || inputs.isEmpty()) {
+        LOG.info("No ondisk files to merge...");
+        return;
+      }
+      
+      long approxOutputSize = 0;
+      int bytesPerSum = 
+        jobConf.getInt("io.bytes.per.checksum", 512);
+      
+      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
+               " map outputs on disk. Triggering merge...");
+      
+      // 1. Prepare the list of files to be merged. 
+      for (Path file : inputs) {
+        approxOutputSize += localFS.getFileStatus(file).getLen();
+      }
+
+      // add the checksum length
+      approxOutputSize += 
+        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
+
+      // 2. Start the on-disk merge process
+      Path outputPath = 
+        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
+            approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
+      Writer<K,V> writer = 
+        new Writer<K,V>(jobConf, rfs, outputPath, 
+                        (Class<K>) jobConf.getMapOutputKeyClass(), 
+                        (Class<V>) jobConf.getMapOutputValueClass(),
+                        codec, null);
+      RawKeyValueIterator iter  = null;
+      Path tmpDir = new Path(reduceId.toString());
+      try {
+        iter = Merger.merge(jobConf, rfs,
+                            (Class<K>) jobConf.getMapOutputKeyClass(),
+                            (Class<V>) jobConf.getMapOutputValueClass(),
+                            codec, inputs.toArray(new Path[inputs.size()]), 
+                            true, ioSortFactor, tmpDir, 
+                            (RawComparator<K>) jobConf.getOutputKeyComparator(), 
+                            reporter, spilledRecordsCounter, null, 
+                            mergedMapOutputsCounter, null);
+
+        Merger.writeFile(iter, writer, reporter, jobConf);
+        writer.close();
+      } catch (IOException e) {
+        localFS.delete(outputPath, true);
+        throw e;
+      }
+
+      closeOnDiskFile(outputPath);
+
+      LOG.info(reduceId +
+          " Finished merging " + inputs.size() + 
+          " map output files on disk of total-size " + 
+          approxOutputSize + "." + 
+          " Local output file is " + outputPath + " of size " +
+          localFS.getFileStatus(outputPath).getLen());
+    }
+  }
+  
+  private void combineAndSpill(
+      RawKeyValueIterator kvIter,
+      Counters.Counter inCounter) throws IOException {
+    JobConf job = jobConf;
+    Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
+    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
+    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
+    RawComparator<K> comparator = 
+      (RawComparator<K>)job.getOutputKeyComparator();
+    try {
+      CombineValuesIterator values = new CombineValuesIterator(
+          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+          inCounter);
+      while (values.more()) {
+        combiner.reduce(values.getKey(), values, combineCollector,
+                        Reporter.NULL);
+        values.nextKey();
+      }
+    } finally {
+      combiner.close();
+    }
+  }
+
+  private long createInMemorySegments(List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+                                      List<Segment<K, V>> inMemorySegments, 
+                                      long leaveBytes
+                                      ) throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (InMemoryMapOutput<K,V> mo : inMemoryMapOutputs) {
+      fullSize += mo.getMemory().length;
+    }
+    while(fullSize > leaveBytes) {
+      InMemoryMapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this, 
+                                                   mo.getMapId(),
+                                                   data, 0, (int)size);
+      inMemorySegments.add(new Segment<K,V>(reader, true, 
+                                            (mo.isPrimaryMapOutput() ? 
+                                            mergedMapOutputsCounter : null)));
+    }
+    return totalSize;
+  }
+
+  class RawKVIteratorReader extends IFile.Reader<K,V> {
+
+    private final RawKeyValueIterator kvIter;
+
+    public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
+        throws IOException {
+      super(null, null, size, null, spilledRecordsCounter);
+      this.kvIter = kvIter;
+    }
+    public boolean nextRawKey(DataInputBuffer key) throws IOException {
+      if (kvIter.next()) {
+        final DataInputBuffer kb = kvIter.getKey();
+        final int kp = kb.getPosition();
+        final int klen = kb.getLength() - kp;
+        key.reset(kb.getData(), kp, klen);
+        bytesRead += klen;
+        return true;
+      }
+      return false;
+    }
+    public void nextRawValue(DataInputBuffer value) throws IOException {
+      final DataInputBuffer vb = kvIter.getValue();
+      final int vp = vb.getPosition();
+      final int vlen = vb.getLength() - vp;
+      value.reset(vb.getData(), vp, vlen);
+      bytesRead += vlen;
+    }
+    public long getPosition() throws IOException {
+      return bytesRead;
+    }
+
+    public void close() throws IOException {
+      kvIter.close();
+    }
+  }
+
+  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+                                       List<Path> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " + 
+             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
+             onDiskMapOutputs.size() + " on-disk map-outputs");
+    
+    final float maxRedPer =
+      job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
+    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+      throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
+                            maxRedPer);
+    }
+    int maxInMemReduce = (int)Math.min(
+        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
+    
+
+    // merge config params
+    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
+    boolean keepInputs = job.getKeepFailedTaskFiles();
+    final Path tmpDir = new Path(reduceId.toString());
+    final RawComparator<K> comparator =
+      (RawComparator<K>)job.getOutputKeyComparator();
+
+    // segments required to vacate memory
+    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
+    long inMemToDiskBytes = 0;
+    boolean mergePhaseFinished = false;
+    if (inMemoryMapOutputs.size() > 0) {
+      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                                memDiskSegments,
+                                                maxInMemReduce);
+      final int numMemDiskSegments = memDiskSegments.size();
+      if (numMemDiskSegments > 0 &&
+            ioSortFactor > onDiskMapOutputs.size()) {
+        
+        // If we reach here, it implies that we have less than io.sort.factor
+        // disk segments and this will be incremented by 1 (result of the 
+        // memory segments merge). Since this total would still be 
+        // <= io.sort.factor, we will not do any more intermediate merges,
+        // the merge of all these disk segments would be directly fed to the
+        // reduce method
+        
+        mergePhaseFinished = true;
+        // must spill to disk, but can't retain in-mem for intermediate merge
+        final Path outputPath = 
+          mapOutputFile.getInputFileForWrite(mapId,
+                                             inMemToDiskBytes).suffix(
+                                                 Task.MERGED_OUTPUT_PREFIX);
+        final RawKeyValueIterator rIter = Merger.merge(job, fs,
+            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
+            mergePhase);
+        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+            keyClass, valueClass, codec, null);
+        try {
+          Merger.writeFile(rIter, writer, reporter, job);
+          // add to list of final disk outputs.
+          onDiskMapOutputs.add(outputPath);
+        } catch (IOException e) {
+          if (null != outputPath) {
+            try {
+              fs.delete(outputPath, true);
+            } catch (IOException ie) {
+              // NOTHING
+            }
+          }
+          throw e;
+        } finally {
+          if (null != writer) {
+            writer.close();
+          }
+        }
+        LOG.info("Merged " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes to disk to satisfy " +
+                 "reduce memory limit");
+        inMemToDiskBytes = 0;
+        memDiskSegments.clear();
+      } else if (inMemToDiskBytes != 0) {
+        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
+                 inMemToDiskBytes + " bytes in memory for " +
+                 "intermediate, on-disk merge");
+      }
+    }
+
+    // segments on disk
+    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
+    long onDiskBytes = inMemToDiskBytes;
+    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+    for (Path file : onDisk) {
+      onDiskBytes += fs.getFileStatus(file).getLen();
+      LOG.debug("Disk file: " + file + " Length is " + 
+          fs.getFileStatus(file).getLen());
+      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
+                                         (file.toString().endsWith(
+                                             Task.MERGED_OUTPUT_PREFIX) ?
+                                          null : mergedMapOutputsCounter)
+                                        ));
+    }
+    LOG.info("Merging " + onDisk.length + " files, " +
+             onDiskBytes + " bytes from disk");
+    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
+      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+        if (o1.getLength() == o2.getLength()) {
+          return 0;
+        }
+        return o1.getLength() < o2.getLength() ? -1 : 1;
+      }
+    });
+
+    // build final list of segments from merged backed by disk + in-mem
+    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
+    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
+                                             finalSegments, 0);
+    LOG.info("Merging " + finalSegments.size() + " segments, " +
+             inMemBytes + " bytes from memory into reduce");
+    if (0 != onDiskBytes) {
+      final int numInMemSegments = memDiskSegments.size();
+      diskSegments.addAll(0, memDiskSegments);
+      memDiskSegments.clear();
+      // Pass mergePhase only if there is a going to be intermediate
+      // merges. See comment where mergePhaseFinished is being set
+      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
+      RawKeyValueIterator diskMerge = Merger.merge(
+          job, fs, keyClass, valueClass, diskSegments,
+          ioSortFactor, numInMemSegments, tmpDir, comparator,
+          reporter, false, spilledRecordsCounter, null, thisPhase);
+      diskSegments.clear();
+      if (0 == finalSegments.size()) {
+        return diskMerge;
+      }
+      finalSegments.add(new Segment<K,V>(
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+    }
+    return Merger.merge(job, fs, keyClass, valueClass,
+                 finalSegments, finalSegments.size(), tmpDir,
+                 comparator, reporter, spilledRecordsCounter, null,
+                 null);
+  
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Tue Jan 22 14:10:42 2013
@@ -34,12 +34,12 @@ abstract class MergeThread<T,K,V> extend
 
   private AtomicInteger numPending = new AtomicInteger(0);
   private LinkedList<List<T>> pendingToBeMerged;
-  protected final MergeManager<K,V> manager;
+  protected final MergeManagerImpl<K,V> manager;
   private final ExceptionReporter reporter;
   private boolean closed = false;
   private final int mergeFactor;
   
-  public MergeThread(MergeManager<K,V> manager, int mergeFactor,
+  public MergeThread(MergeManagerImpl<K,V> manager, int mergeFactor,
                      ExceptionReporter reporter) {
     this.pendingToBeMerged = new LinkedList<List<T>>();
     this.manager = manager;

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1436936&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Tue Jan 22 14:10:42 2013
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapOutputFile;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
+  private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
+  private final FileSystem localFS;
+  private final Path tmpOutputPath;
+  private final Path outputPath;
+  private final MergeManagerImpl<K, V> merger;
+  private final OutputStream disk; 
+
+  public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
+                         MergeManagerImpl<K, V> merger, long size,
+                         JobConf conf,
+                         MapOutputFile mapOutputFile,
+                         int fetcher, boolean primaryMapOutput)
+    throws IOException {
+    super(mapId, size, primaryMapOutput);
+    this.merger = merger;
+    this.localFS = FileSystem.getLocal(conf);
+    outputPath =
+        mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
+    
+    disk = localFS.create(tmpOutputPath);
+
+  }
+
+  @Override
+  public void shuffle(MapHost host, InputStream input,
+                      long compressedLength, long decompressedLength,
+                      ShuffleClientMetrics metrics,
+                      Reporter reporter) throws IOException {
+    // Copy data to local-disk
+    long bytesLeft = compressedLength;
+    try {
+      final int BYTES_TO_READ = 64 * 1024;
+      byte[] buf = new byte[BYTES_TO_READ];
+      while (bytesLeft > 0) {
+        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        if (n < 0) {
+          throw new IOException("read past end of stream reading " + 
+                                getMapId());
+        }
+        disk.write(buf, 0, n);
+        bytesLeft -= n;
+        metrics.inputBytes(n);
+        reporter.progress();
+      }
+
+      LOG.info("Read " + (compressedLength - bytesLeft) + 
+               " bytes from map-output for " + getMapId());
+
+      disk.close();
+    } catch (IOException ioe) {
+      // Close the streams
+      IOUtils.cleanup(LOG, input, disk);
+
+      // Re-throw
+      throw ioe;
+    }
+
+    // Sanity check
+    if (bytesLeft != 0) {
+      throw new IOException("Incomplete map output received for " +
+                            getMapId() + " from " +
+                            host.getHostName() + " (" + 
+                            bytesLeft + " bytes missing of " + 
+                            compressedLength + ")");
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    localFS.rename(tmpOutputPath, outputPath);
+    merger.closeOnDiskFile(outputPath);
+  }
+  
+  @Override
+  public void abort() {
+    try {
+      localFS.delete(tmpOutputPath, false);
+    } catch (IOException ie) {
+      LOG.info("failure to clean up " + tmpOutputPath, ie);
+    }
+  }
+
+  @Override
+  public String getDescription() {
+    return "DISK";
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Jan 22 14:10:42 2013
@@ -21,17 +21,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
@@ -77,17 +70,21 @@ public class Shuffle<K, V> implements Sh
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
     
-    scheduler = 
-      new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase, 
-                                context.getShuffledMapsCounter(), 
-                                context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
-    merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
-                                    context.getLocalDirAllocator(), reporter, context.getCodec(),
-                                    context.getCombinerClass(), context.getCombineCollector(), 
-                                    context.getSpilledRecordsCounter(), 
-                                    context.getReduceCombineInputCounter(), 
-                                    context.getMergedMapOutputsCounter(), 
-                                    this, context.getMergePhase(), context.getMapOutputFile());
+    scheduler = new ShuffleScheduler<K,V>(jobConf, taskStatus, this,
+        copyPhase, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+    merger = createMergeManager(context);
+  }
+
+  protected MergeManager<K, V> createMergeManager(
+      ShuffleConsumerPlugin.Context context) {
+    return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
+        context.getLocalDirAllocator(), reporter, context.getCodec(),
+        context.getCombinerClass(), context.getCombineCollector(), 
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+        context.getMapOutputFile());
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Tue Jan 22 14:10:42 2013
@@ -53,7 +53,7 @@ public class TestFetcher {
     private HttpURLConnection connection;
 
     public FakeFetcher(JobConf job, TaskAttemptID reduceId,
-        ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter,
+        ShuffleScheduler<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter,
         ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
         SecretKey jobTokenSecret, HttpURLConnection connection) {
       super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
@@ -77,7 +77,7 @@ public class TestFetcher {
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
     ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
-    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     ExceptionReporter except = mock(ExceptionReporter.class);
@@ -132,7 +132,7 @@ public class TestFetcher {
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
     ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
-    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     ExceptionReporter except = mock(ExceptionReporter.class);
@@ -167,10 +167,9 @@ public class TestFetcher {
     header.write(new DataOutputStream(bout));
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
-    //Defaults to WAIT, which is what we want to test
-    MapOutput<Text,Text> mapOut = new MapOutput<Text, Text>(map1ID);
+    //Defaults to null, which is what we want to test
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(mapOut);
+      .thenReturn(null);
     
     underTest.copyFromHost(host);
     

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Tue Jan 22 14:10:42 2013
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestMergeManager {
 
   @Test(timeout=10000)
+  @SuppressWarnings("unchecked")
   public void testMemoryMerge() throws Exception {
     final int TOTAL_MEM_BYTES = 10000;
     final int OUTPUT_SIZE = 7950;
@@ -55,45 +55,47 @@ public class TestMergeManager {
 
     // reserve enough map output to cause a merge when it is committed
     MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out1.getType());
-    fillOutput(out1);
+    Assert.assertTrue("Should be a memory merge",
+                      (out1 instanceof InMemoryMapOutput));
+    InMemoryMapOutput<Text, Text> mout1 = (InMemoryMapOutput<Text, Text>)out1;
+    fillOutput(mout1);
     MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out2.getType());
-    fillOutput(out2);
+    Assert.assertTrue("Should be a memory merge",
+                      (out2 instanceof InMemoryMapOutput));
+    InMemoryMapOutput<Text, Text> mout2 = (InMemoryMapOutput<Text, Text>)out2;
+    fillOutput(mout2);
 
     // next reservation should be a WAIT
     MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be told to wait",
-        Type.WAIT, out3.getType());
+    Assert.assertEquals("Should be told to wait", null, out3);
 
     // trigger the first merge and wait for merge thread to start merging
     // and free enough output to reserve more
-    out1.commit();
-    out2.commit();
+    mout1.commit();
+    mout2.commit();
     mergeStart.await();
 
     Assert.assertEquals(1, mgr.getNumMerges());
 
     // reserve enough map output to cause another merge when committed
     out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out1.getType());
-    fillOutput(out1);
+    Assert.assertTrue("Should be a memory merge",
+                       (out1 instanceof InMemoryMapOutput));
+    mout1 = (InMemoryMapOutput<Text, Text>)out1;
+    fillOutput(mout1);
     out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out2.getType());
-    fillOutput(out2);
+    Assert.assertTrue("Should be a memory merge",
+                       (out2 instanceof InMemoryMapOutput));
+    mout2 = (InMemoryMapOutput<Text, Text>)out2;
+    fillOutput(mout2);
 
-    // next reservation should be a WAIT
+    // next reservation should be null
     out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be told to wait",
-        Type.WAIT, out3.getType());
+    Assert.assertEquals("Should be told to wait", null, out3);
 
     // commit output *before* merge thread completes
-    out1.commit();
-    out2.commit();
+    mout1.commit();
+    mout2.commit();
 
     // allow the first merge to complete
     mergeComplete.await();
@@ -110,7 +112,7 @@ public class TestMergeManager {
         0, reporter.getNumExceptions());
   }
 
-  private void fillOutput(MapOutput<Text, Text> output) throws IOException {
+  private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
     BoundedByteArrayOutputStream stream = output.getArrayStream();
     int count = stream.getLimit();
     for (int i=0; i < count; ++i) {
@@ -118,7 +120,7 @@ public class TestMergeManager {
     }
   }
 
-  private static class StubbedMergeManager extends MergeManager<Text, Text> {
+  private static class StubbedMergeManager extends MergeManagerImpl<Text, Text> {
     private TestMergeThread mergeThread;
 
     public StubbedMergeManager(JobConf conf, ExceptionReporter reporter,
@@ -129,7 +131,7 @@ public class TestMergeManager {
     }
 
     @Override
-    protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
+    protected MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
       mergeThread = new TestMergeThread(this, getExceptionReporter());
       return mergeThread;
     }
@@ -140,12 +142,12 @@ public class TestMergeManager {
   }
 
   private static class TestMergeThread
-  extends MergeThread<MapOutput<Text,Text>, Text, Text> {
+  extends MergeThread<InMemoryMapOutput<Text,Text>, Text, Text> {
     private AtomicInteger numMerges;
     private CyclicBarrier mergeStart;
     private CyclicBarrier mergeComplete;
 
-    public TestMergeThread(MergeManager<Text, Text> mergeManager,
+    public TestMergeThread(MergeManagerImpl<Text, Text> mergeManager,
         ExceptionReporter reporter) {
       super(mergeManager, Integer.MAX_VALUE, reporter);
       numMerges = new AtomicInteger(0);
@@ -162,11 +164,11 @@ public class TestMergeManager {
     }
 
     @Override
-    public void merge(List<MapOutput<Text, Text>> inputs)
+    public void merge(List<InMemoryMapOutput<Text, Text>> inputs)
         throws IOException {
       synchronized (this) {
         numMerges.incrementAndGet();
-        for (MapOutput<Text, Text> input : inputs) {
+        for (InMemoryMapOutput<Text, Text> input : inputs) {
           manager.unreserve(input.getSize());
         }
       }



Mime
View raw message