hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1450915 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/...
Date Wed, 27 Feb 2013 18:53:04 GMT
Author: vinodkv
Date: Wed Feb 27 18:53:03 2013
New Revision: 1450915

URL: http://svn.apache.org/r1450915
Log:
MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits' allocation on small
clusters. Contributed by Bikas Saha.
svn merge --ignore-ancestry -c 1450912 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Feb 27 18:53:03
2013
@@ -11,6 +11,9 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-5033. mapred shell script should respect usage flags
     (--help -help -h). (Andrew Wang via atm)
 
+    MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+    allocation on small clusters. (Bikas Saha via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
Wed Feb 27 18:53:03 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
  * {@link InputFormat#getSplits(JobContext)} method. 
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
 @InterfaceStability.Stable
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
-
+  
   public static final String SPLIT_MINSIZE_PERNODE = 
     "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK = 
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
   @Override
   public List<InputSplit> getSplits(JobContext job) 
     throws IOException {
-
     long minSizeNode = 0;
     long minSizeRack = 0;
     long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
                                  rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
+    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
+                 maxSize, minSizeNode, minSizeRack, splits);
+  }
 
+  @VisibleForTesting
+  void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                     HashMap<OneBlockInfo, String[]> blockToNodes,
+                     HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                     long totLength,
+                     long maxSize,
+                     long minSizeNode,
+                     long minSizeRack,
+                     List<InputSplit> splits                     
+                    ) {
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
     Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
+    
+    int numNodes = nodeToBlocks.size();
+    long totalLength = totLength;
 
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
+    while(true) {
+      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+      int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+                                        ((int) (totalLength/maxSize))/numNodes
+                                        : Integer.MAX_VALUE;
+      int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+      numNodes = 0;
+
+      // process all nodes and create splits that are local to a node.
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        nodes.add(one.getKey());
+        List<OneBlockInfo> blocksInNode = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        int splitsInNode = 0;
+        for (OneBlockInfo oneblock : blocksInNode) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, nodes, validBlocks);
+              totalLength -= curSplitSize;
+              curSplitSize = 0;
+              validBlocks.clear();
+              splitsInNode++;
+              if (splitsInNode == maxSplitsByNodeOnly) {
+                // stop grouping on a node so as not to create
+                // disproportionately more splits on a node because it happens
+                // to have many blocks
+                // consider only these nodes in next round of grouping because
+                // they have leftover blocks that may need to be grouped
+                numNodes++;
+                break;
+              }
+            }
           }
         }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the 
-      // same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
+        // if there were any blocks left over and their combined size is
+        // larger than minSplitNode, then combine them into one split.
+        // Otherwise add them back to the unprocessed pool. It is likely
+        // that they will be combined with other blocks from the
+        // same rack later on.
+        if (minSizeNode != 0 && curSplitSize >= minSizeNode
+            && splitsInNode == 0) {
+          // haven't created any split on this machine. so its ok to add a
+          // smaller
+          // one for parallelism. Otherwise group it in the rack for balanced
+          // size
+          // create an input split and add it to the splits array
+          addCreatedSplit(splits, nodes, validBlocks);
+          totalLength -= curSplitSize;
+        } else {
+          for (OneBlockInfo oneblock : validBlocks) {
+            blockToNodes.put(oneblock, oneblock.hosts);
+          }
         }
+        validBlocks.clear();
+        nodes.clear();
+        curSplitSize = 0;
+      }
+      
+      if(!(numNodes>0 && totalLength>0)) {
+        break;
       }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
     }
 
     // if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
       offset[i] = validBlocks.get(i).offset;
       length[i] = validBlocks.get(i).length;
     }
-
      // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                    length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one file from the File System
    */
-  private static class OneFileInfo {
+  @VisibleForTesting
+  static class OneFileInfo {
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
           }
           blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
         }
+        
+        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                          nodeToBlocks, rackToNodes);
+      }
+    }
+    
+    @VisibleForTesting
+    static void populateBlockInfo(OneBlockInfo[] blocks,
+                          HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                          HashMap<OneBlockInfo, String[]> blockToNodes,
+                          HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                          HashMap<String, Set<String>> rackToNodes) {
+      for (OneBlockInfo oneblock : blocks) {
+        // add this block to the block --> node locations map
+        blockToNodes.put(oneblock, oneblock.hosts);
+
+        // For blocks that do not have host/rack information,
+        // assign to default  rack.
+        String[] racks = null;
+        if (oneblock.hosts.length == 0) {
+          racks = new String[]{NetworkTopology.DEFAULT_RACK};
+        } else {
+          racks = oneblock.racks;
+        }
 
-        for (OneBlockInfo oneblock : blocks) {
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // For blocks that do not have host/rack information,
-          // assign to default  rack.
-          String[] racks = null;
-          if (oneblock.hosts.length == 0) {
-            racks = new String[]{NetworkTopology.DEFAULT_RACK};
-          } else {
-            racks = oneblock.racks;
+        // add this block to the rack --> block map
+        for (int j = 0; j < racks.length; j++) {
+          String rack = racks[j];
+          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            rackToBlocks.put(rack, blklist);
           }
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < racks.length; j++) {
-            String rack = racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
-            }
+          blklist.add(oneblock);
+          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
           }
+        }
 
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
+        // add this block to the node --> block map
+        for (int j = 0; j < oneblock.hosts.length; j++) {
+          String node = oneblock.hosts[j];
+          List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            nodeToBlocks.put(node, blklist);
           }
+          blklist.add(oneblock);
         }
       }
     }
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one block from the File System
    */
-  private static class OneBlockInfo {
+  @VisibleForTesting
+  static class OneBlockInfo {
     Path onepath;                // name of this file
     long offset;                 // offset in file
     long length;                 // length of this block

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Wed Feb 27 18:53:03 2013
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.concurrent.TimeoutException;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.*;
@@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.junit.Test;
 
+import com.google.common.collect.HashMultiset;
+
 public class TestCombineFileInputFormat extends TestCase {
 
   private static final String rack1[] = new String[] {
@@ -476,23 +483,23 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(2, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
       assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 3 blocks 
       inFormat = new DummyInputFormat();
@@ -504,7 +511,7 @@ public class TestCombineFileInputFormat 
       for (InputSplit split : splits) {
         System.out.println("File split(Test5): " + split);
       }
-      assertEquals(4, splits.size());
+      assertEquals(3, splits.size());
       fileSplit = (CombineFileSplit) splits.get(0);
       assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
@@ -519,32 +526,28 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(0, fileSplit.getOffset(2));
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(3);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+      assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 4 blocks 
@@ -713,6 +716,56 @@ public class TestCombineFileInputFormat 
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
   
+  public void testNodeInputSplit() throws IOException, InterruptedException {
+    // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
+    // both nodes. The grouping ensures that both nodes get splits instead of 
+    // just the first node
+    DummyInputFormat inFormat = new DummyInputFormat();
+    int numBlocks = 12;
+    long totLength = 0;
+    long blockSize = 100;
+    long maxSize = 200;
+    long minSizeNode = 50;
+    long minSizeRack = 50;
+    String[] locations = { "h1", "h2" };
+    String[] racks = new String[0];
+    Path path = new Path("hdfs://file");
+    
+    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+    for(int i=0; i<numBlocks; ++i) {
+      blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
+      totLength += blockSize;
+    }
+    
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HashMap<String, Set<String>> rackToNodes = 
+                              new HashMap<String, Set<String>>();
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                             nodeToBlocks, rackToNodes);
+    
+    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,  
+                          maxSize, minSizeNode, minSizeRack, splits);
+    
+    int expectedSplitCount = (int)(totLength/maxSize);
+    Assert.assertEquals(expectedSplitCount, splits.size());
+    HashMultiset<String> nodeSplits = HashMultiset.create();
+    for(int i=0; i<expectedSplitCount; ++i) {
+      InputSplit inSplit = splits.get(i);
+      Assert.assertEquals(maxSize, inSplit.getLength());
+      Assert.assertEquals(1, inSplit.getLocations().length);
+      nodeSplits.add(inSplit.getLocations()[0]);
+    }
+    Assert.assertEquals(3, nodeSplits.count(locations[0]));
+    Assert.assertEquals(3, nodeSplits.count(locations[1]));
+  }
+  
   public void testSplitPlacementForCompressedFiles() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
@@ -889,24 +942,24 @@ public class TestCombineFileInputFormat 
       assertEquals(f3.getLen(), fileSplit.getLength(0));
       assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f4.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits.get(3);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
 
       // maximum split size is twice file1's length
       inFormat = new DummyInputFormat();



Mime
View raw message