tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups. (sseth)
Date Tue, 12 Jan 2016 23:16:46 GMT
Repository: tez
Updated Branches:
  refs/heads/master 85637c61a -> 0c085771b


TEZ-2978. Add an option to allow the SplitGrouper to generate node local
only groups. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0c085771
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0c085771
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0c085771

Branch: refs/heads/master
Commit: 0c085771b8c501d4fa492ab7c9dc57a1abcae52b
Parents: 85637c6
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jan 12 15:16:31 2016 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jan 12 15:16:31 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/grouper/TezSplitGrouper.java  |  33 +++-
 .../hadoop/mapred/split/TestGroupedSplits.java  | 192 ++++++++++++++-----
 3 files changed, 180 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea2b1d5..6cdc037 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups.
   TEZ-2129. Task and Attempt views should contain links to the logs
   TEZ-3025. InputInitializer creation should use the dag ugi.
   TEZ-3017. HistoryACLManager does not have a close method for cleanup

http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index 163a2a3..9435e68 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -94,6 +94,13 @@ public abstract class TezSplitGrouper {
   public static final String TEZ_GROUPING_REPEATABLE = "tez.grouping.repeatable";
   public static final boolean TEZ_GROUPING_REPEATABLE_DEFAULT = true;
 
+  /**
+   * Generate node local splits only. This prevents fallback to rack locality etc, and overrides
+   * the target size for small splits.
+   */
+  public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
+  public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
+
 
   static class LocationHolder {
     List<SplitContainer> splits;
@@ -302,6 +309,9 @@ public abstract class TezSplitGrouper {
     boolean groupByCount = conf.getBoolean(
         TEZ_GROUPING_SPLIT_BY_COUNT,
         TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
+    boolean nodeLocalOnly = conf.getBoolean(
+        TEZ_GROUPING_NODE_LOCAL_ONLY,
+        TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT);
     if (!(groupByLength || groupByCount)) {
       throw new TezUncheckedException(
           "None of the grouping parameters are true: "
@@ -315,7 +325,9 @@ public abstract class TezSplitGrouper {
         " numSplitsInGroup: " + numSplitsInGroup +
         " totalLength: " + totalLength +
         " numOriginalSplits: " + originalSplits.size() +
-        " . Grouping by length: " + groupByLength + " count: " + groupByCount);
+        " . Grouping by length: " + groupByLength +
+        " count: " + groupByCount +
+        " nodeLocalOnly: " + nodeLocalOnly);
 
     // go through locations and group splits
     int splitsProcessed = 0;
@@ -332,7 +344,6 @@ public abstract class TezSplitGrouper {
         groupLocationSet.clear();
         String location = entry.getKey();
         LocationHolder holder = entry.getValue();
-        // KKK rename to splitContainer
         SplitContainer splitContainer = holder.getUnprocessedHeadSplit();
         if (splitContainer == null) {
           // all splits on node processed
@@ -402,7 +413,18 @@ public abstract class TezSplitGrouper {
       }
 
       if (!doingRackLocal && numFullGroupsCreated < 1) {
-        // no node could create a node-local group. go rack-local
+        // no node could create a regular node-local group.
+
+        // Allow small groups if that is configured.
+        if (nodeLocalOnly && !allowSmallGroups) {
+          LOG.info(
+              "Allowing small groups early after attempting to create full groups at iteration:
{}, groupsCreatedSoFar={}",
+              iterations, groupedSplits.size());
+          allowSmallGroups = true;
+          continue;
+        }
+
+        // else go rack-local
         doingRackLocal = true;
         // re-create locations
         int numRemainingSplits = originalSplits.size() - splitsProcessed;
@@ -601,6 +623,11 @@ public abstract class TezSplitGrouper {
       return this;
     }
 
+    public TezMRSplitsGrouperConfigBuilder setNodeLocalGroupsOnly(boolean nodeLocalGroupsOnly)
{
+      this.conf.setBoolean(TEZ_GROUPING_NODE_LOCAL_ONLY, nodeLocalGroupsOnly);
+      return this;
+    }
+
     /**
      * upper and lower bounds for the splits
      */

http://git-wip-us.apache.org/repos/asf/tez/blob/0c085771/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
index 140a09d..fba72a3 100644
--- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
+++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java
@@ -28,10 +28,13 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
@@ -57,6 +60,9 @@ import org.junit.Test;
 
 import com.google.common.collect.Sets;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 public class TestGroupedSplits {
@@ -118,9 +124,9 @@ public class TestGroupedSplits {
 
       // we should have a single split as the length is comfortably smaller than
       // the block size
-      Assert.assertEquals("We got more than one splits!", 1, splits.length);
+      assertEquals("We got more than one splits!", 1, splits.length);
       InputSplit split = splits[0];
-      Assert.assertEquals("It should be TezGroupedSplit",
+      assertEquals("It should be TezGroupedSplit",
         TezGroupedSplit.class, split.getClass());
 
       // check the split
@@ -137,7 +143,7 @@ public class TestGroupedSplits {
             LOG.warn("conflict with " + v +
                      " at position "+reader.getPos());
           }
-          Assert.assertFalse("Key in multiple partitions.", bits.get(v));
+          assertFalse("Key in multiple partitions.", bits.get(v));
           bits.set(v);
           count++;
         }
@@ -145,7 +151,7 @@ public class TestGroupedSplits {
       } finally {
         reader.close();
       }
-      Assert.assertEquals("Some keys in no partition.", length, bits.cardinality());
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
     }
   }
 
@@ -260,14 +266,14 @@ public class TestGroupedSplits {
       if (j==1) {
         // j==1 covers single split corner case
         // and does not do grouping
-        Assert.assertEquals("compressed splits == " + j, j, splits.length);
+        assertEquals("compressed splits == " + j, j, splits.length);
       }
       List<Text> results = new ArrayList<Text>();
       for (int i=0; i<splits.length; ++i) { 
         List<Text> read = readSplit(format, splits[i], job);
         results.addAll(read);
       }
-      Assert.assertEquals("splits length", 11, results.size());
+      assertEquals("splits length", 11, results.size());
   
       final String[] firstList =
         {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
@@ -293,7 +299,7 @@ public class TestGroupedSplits {
 
   private static int testResults(List<Text> results, String[] first, int start) {
     for (int i = 0; i < first.length; i++) {
-      Assert.assertEquals("splits["+i+"]", first[i], results.get(start+i).toString());
+      assertEquals("splits["+i+"]", first[i], results.get(start+i).toString());
     }
     return first.length+start;
   }  
@@ -324,17 +330,17 @@ public class TestGroupedSplits {
     // desired splits not set. We end up choosing min/max split size based on 
     // total data and num original splits. In this case, min size will be hit
     InputSplit[] splits = format.getSplits(job, 0);
-    Assert.assertEquals(25, splits.length);
+    assertEquals(25, splits.length);
     
     // split too big. override with max
     format.setDesiredNumberOfSplits(1);
     splits = format.getSplits(job, 0);
-    Assert.assertEquals(4, splits.length);
+    assertEquals(4, splits.length);
     
     // splits too small. override with min
     format.setDesiredNumberOfSplits(1000);
     splits = format.getSplits(job, 0);
-    Assert.assertEquals(25, splits.length);
+    assertEquals(25, splits.length);
     
   }
   
@@ -398,7 +404,7 @@ public class TestGroupedSplits {
     // the remainig 3 splits (1 from each node) will be grouped at rack level (default-rack)
     // all of them will maintain ordering
     InputSplit[] groupedSplits = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
-    Assert.assertEquals(4, groupedSplits.length);
+    assertEquals(4, groupedSplits.length);
     for (int i=0; i<4; ++i) {
       TezGroupedSplit split = (TezGroupedSplit)groupedSplits[i];
       List<InputSplit> innerSplits = split.getGroupedSplits();
@@ -406,12 +412,12 @@ public class TestGroupedSplits {
       // splits in group maintain original order
       for (InputSplit innerSplit : innerSplits) {
         int splitPos = ((TestInputSplit) innerSplit).getPosition();
-        Assert.assertTrue(pos < splitPos);
+        assertTrue(pos < splitPos);
         pos = splitPos;
       }
       // last one is rack split
       if (i==3) {
-        Assert.assertTrue(split.getRack() != null);
+        assertTrue(split.getRack() != null);
       }
     }
   }
@@ -456,24 +462,25 @@ public class TestGroupedSplits {
     // all of them will maintain ordering
     InputSplit[] groupedSplits1 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
     InputSplit[] groupedSplits2 = grouper.getGroupedSplits(conf, origSplits, 4, "InputFormat");
-    Assert.assertEquals(4, groupedSplits1.length);
-    Assert.assertEquals(4, groupedSplits2.length);
+    // KKK Start looking here.
+    assertEquals(4, groupedSplits1.length);
+    assertEquals(4, groupedSplits2.length);
     // check both split groups are the same. this depends on maintaining split order tested
above
     for (int i=0; i<4; ++i) {
       TezGroupedSplit gSplit1 = ((TezGroupedSplit) groupedSplits1[i]);
       List<InputSplit> testSplits1 = gSplit1.getGroupedSplits();
       TezGroupedSplit gSplit2 = ((TezGroupedSplit) groupedSplits2[i]);
       List<InputSplit> testSplits2 = gSplit2.getGroupedSplits();
-      Assert.assertEquals(testSplits1.size(), testSplits2.size());
+      assertEquals(testSplits1.size(), testSplits2.size());
       for (int j=0; j<testSplits1.size(); j++) {
         TestInputSplit split1 = (TestInputSplit) testSplits1.get(j);
         TestInputSplit split2 = (TestInputSplit) testSplits2.get(j);
-        Assert.assertEquals(split1.position, split2.position);
+        assertEquals(split1.position, split2.position);
       }
       if (i==3) {
         // check for rack split creation. Ensures repeatability holds for rack splits also
-        Assert.assertTrue(gSplit1.getRack() != null);
-        Assert.assertTrue(gSplit2.getRack() != null);
+        assertTrue(gSplit1.getRack() != null);
+        assertTrue(gSplit2.getRack() != null);
       }
     }
   }
@@ -502,12 +509,12 @@ public class TestGroupedSplits {
     
     format.setDesiredNumberOfSplits(1);
     InputSplit[] splits = format.getSplits(job, 1);
-    Assert.assertEquals(1, splits.length);
+    assertEquals(1, splits.length);
     TezGroupedSplit split = (TezGroupedSplit) splits[0];
     // all 3 splits are present
-    Assert.assertEquals(numSplits, split.wrappedSplits.size());
+    assertEquals(numSplits, split.wrappedSplits.size());
     Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
-    Assert.assertEquals(numSplits, splitSet.size());
+    assertEquals(numSplits, splitSet.size());
   }
   
   @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -540,10 +547,10 @@ public class TestGroupedSplits {
     
     format.setDesiredNumberOfSplits(1);
     InputSplit[] splits = format.getSplits(job, 1);
-    Assert.assertEquals(1, splits.length);
+    assertEquals(1, splits.length);
     TezGroupedSplit split = (TezGroupedSplit) splits[0];
     // all 3 splits are present
-    Assert.assertEquals(numSplits, split.wrappedSplits.size());
+    assertEquals(numSplits, split.wrappedSplits.size());
     ByteArrayOutputStream bOut = new ByteArrayOutputStream();
     split.write(new DataOutputStream(bOut));
   }
@@ -589,17 +596,17 @@ public class TestGroupedSplits {
 
     format.setDesiredNumberOfSplits(numSplits);
     InputSplit[] splits = format.getSplits(job, 1);
-    Assert.assertEquals(numSplits, splits.length);
+    assertEquals(numSplits, splits.length);
     for (int i = 0 ; i < numSplits ; i++) {
       TezGroupedSplit split = (TezGroupedSplit) splits[i];
       // all 3 splits are present
-      Assert.assertEquals(1, split.wrappedSplits.size());
+      assertEquals(1, split.wrappedSplits.size());
       if (i==3) {
-        Assert.assertEquals(1, split.getLocations().length);
-        Assert.assertEquals(validLocation, split.getLocations()[0]);
+        assertEquals(1, split.getLocations().length);
+        assertEquals(validLocation, split.getLocations()[0]);
       } else if (i==4) {
-        Assert.assertEquals(1, split.getLocations().length);
-        Assert.assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2));
+        assertEquals(1, split.getLocations().length);
+        assertTrue(split.getLocations()[0].equals(validLocation) || split.getLocations()[0].equals(validLocation2));
       } else {
         Assert.assertNull(split.getLocations());
       }
@@ -662,16 +669,16 @@ public class TestGroupedSplits {
 
     InputSplit[] splits = format.getSplits(job, 1);
     // due to the min = 12Mb
-    Assert.assertEquals(2, splits.length);
+    assertEquals(2, splits.length);
 
     for (InputSplit group : splits) {
       TezGroupedSplit split = (TezGroupedSplit) group;
       if (split.wrappedSplits.size() == 2) {
         // split1+split2
-        Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l);
+        assertEquals(split.getLength(), 2 * 1000 * 1000l);
       } else {
         // split3
-        Assert.assertEquals(split.getLength(), 2 * 1000 * 1000l + 1);
+        assertEquals(split.getLength(), 2 * 1000 * 1000l + 1);
       }
     }
   }
@@ -708,14 +715,14 @@ public class TestGroupedSplits {
         "MockInputForamt", null, locationProvider);
 
     // Sanity. 1 group, with 3 splits.
-    Assert.assertEquals(1, groupedSplits.length);
-    Assert.assertTrue(groupedSplits[0] instanceof  TezGroupedSplit);
+    assertEquals(1, groupedSplits.length);
+    assertTrue(groupedSplits[0] instanceof  TezGroupedSplit);
     TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[0];
-    Assert.assertEquals(3, groupedSplit.getGroupedSplits().size());
+    assertEquals(3, groupedSplit.getGroupedSplits().size());
 
     // Verify that the split ends up being grouped to the custom location.
-    Assert.assertEquals(1, groupedSplit.getLocations().length);
-    Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+    assertEquals(1, groupedSplit.getLocations().length);
+    assertEquals("customLocation", groupedSplit.getLocations()[0]);
   }
 
   // Original splits returned.
@@ -749,15 +756,114 @@ public class TestGroupedSplits {
         "MockInputForamt", null, locationProvider);
 
     // Sanity. 3 group, with 1 split each
-    Assert.assertEquals(3, groupedSplits.length);
+    assertEquals(3, groupedSplits.length);
     for (int i = 0 ; i < 3 ; i++) {
-      Assert.assertTrue(groupedSplits[i] instanceof  TezGroupedSplit);
+      assertTrue(groupedSplits[i] instanceof  TezGroupedSplit);
       TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[i];
-      Assert.assertEquals(1, groupedSplit.getGroupedSplits().size());
+      assertEquals(1, groupedSplit.getGroupedSplits().size());
 
       // Verify the splits have their final location set to customLocation
-      Assert.assertEquals(1, groupedSplit.getLocations().length);
-      Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]);
+      assertEquals(1, groupedSplit.getLocations().length);
+      assertEquals("customLocation", groupedSplit.getLocations()[0]);
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testForceNodeLocalSplits() throws IOException {
+    int numLocations = 7;
+    long splitLen = 100L;
+    String[] locations = new String[numLocations];
+    for (int i = 0; i < numLocations; i++) {
+      locations[i] = "node" + i;
+    }
+
+    // Generate 24 splits (6 per node) spread evenly across node0-node3.
+    // Generate 1 split each on the remaining 3 nodes (4-6)
+    int numSplits = 27;
+    InputSplit[] rawSplits = new InputSplit[numSplits];
+    for (int i = 0; i < 27; i++) {
+      String splitLoc[] = new String[1];
+      if (i < 24) {
+        splitLoc[0] = locations[i % 4];
+      } else {
+        splitLoc[0] = locations[4 + i % 24];
+      }
+      rawSplits[i] = new TestInputSplit(splitLen, splitLoc, i);
+    }
+
+    TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+    JobConf confDisallowSmallEarly = new JobConf(defaultConf);
+    confDisallowSmallEarly = (JobConf) TezSplitGrouper.newConfigBuilder(confDisallowSmallEarly)
+        .setGroupingSplitSize(splitLen * 3, splitLen * 3)
+        .setGroupingRackSplitSizeReduction(1)
+        .setNodeLocalGroupsOnly(false)
+        .build();
+
+    JobConf confSmallEarly = new JobConf(defaultConf);
+    confSmallEarly = (JobConf) TezSplitGrouper.newConfigBuilder(confSmallEarly)
+        .setGroupingSplitSize(splitLen * 3, splitLen * 3)
+        .setGroupingRackSplitSizeReduction(1)
+        .setNodeLocalGroupsOnly(true)
+        .build();
+
+    // Without early grouping -> 4 * 2 node local, 1 merged - 9 total
+    // With early grouping -> 4 * 2 node local (first 4 nodes), 3 smaller node local (4-6)
-> 11 total
+
+    // Requesting 9 based purely on size.
+    InputSplit[] groupedSplitsDisallowSmallEarly =
+        grouper.getGroupedSplits(confDisallowSmallEarly, rawSplits, 9, "InputFormat");
+    assertEquals(9, groupedSplitsDisallowSmallEarly.length);
+    // Verify the actual splits as well.
+    Map<String, MutableInt> matchedLocations = new HashMap<>();
+    verifySplitsFortestAllowSmallSplitsEarly(groupedSplitsDisallowSmallEarly);
+    TezGroupedSplit group = (TezGroupedSplit) groupedSplitsDisallowSmallEarly[8];
+    assertEquals(3, group.getLocations().length);
+    assertEquals(3, group.getGroupedSplits().size());
+    Set<String> exp = Sets.newHashSet(locations[4], locations[5], locations[6]);
+    for (int i = 0; i < 3; i++) {
+      LOG.info(group.getLocations()[i]);
+      exp.remove(group.getLocations()[i]);
+    }
+    assertEquals(0, exp.size());
+
+    InputSplit[] groupedSplitsSmallEarly =
+        grouper.getGroupedSplits(confSmallEarly, rawSplits, 9, "InputFormat");
+    assertEquals(11, groupedSplitsSmallEarly.length);
+    // The first 8 are the larger groups.
+    verifySplitsFortestAllowSmallSplitsEarly(groupedSplitsSmallEarly);
+    exp = Sets.newHashSet(locations[4], locations[5], locations[6]);
+    for (int i = 8; i < 11; i++) {
+      group = (TezGroupedSplit) groupedSplitsSmallEarly[i];
+      assertEquals(1, group.getLocations().length);
+      assertEquals(1, group.getGroupedSplits().size());
+      String matchedLoc = group.getLocations()[0];
+      assertTrue(exp.contains(matchedLoc));
+      exp.remove(matchedLoc);
+    }
+    assertEquals(0, exp.size());
+  }
+
+  private void verifySplitsFortestAllowSmallSplitsEarly(InputSplit[] groupedSplits) throws
+      IOException {
+    Map<String, MutableInt> matchedLocations = new HashMap<>();
+    for (int i = 0; i < 8; i++) {
+      TezGroupedSplit group = (TezGroupedSplit) groupedSplits[i];
+      assertEquals(1, group.getLocations().length);
+      assertEquals(3, group.getGroupedSplits().size());
+      String matchedLoc = group.getLocations()[0];
+      MutableInt count = matchedLocations.get(matchedLoc);
+      if (count == null) {
+        count = new MutableInt(0);
+        matchedLocations.put(matchedLoc, count);
+      }
+      count.increment();
+    }
+    for (Map.Entry<String, MutableInt> entry : matchedLocations.entrySet()) {
+      String loc = entry.getKey();
+      int nodeId = Character.getNumericValue(loc.charAt(loc.length() - 1));
+      assertTrue(nodeId < 4);
+      assertTrue(loc.startsWith("node") && loc.length() == 5);
+      assertEquals(2, entry.getValue().getValue());
     }
   }
 


Mime
View raw message