carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: [CARBONDATA-171] Block distribution not proper when the number of active executors more than the node size
Date Tue, 23 Aug 2016 19:39:26 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/branch-0.1 1f4bcdc97 -> e4bb9949e


[CARBONDATA-171] Block distribution not proper when the number of active executors more than
the node size


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5036af4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5036af4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5036af4d

Branch: refs/heads/branch-0.1
Commit: 5036af4dd5700b07219534ea06b5f6b887bb53cf
Parents: 1f4bcdc
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Tue Aug 23 20:47:28 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Wed Aug 24 01:05:03 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java |  40 ++--
 .../spark/sql/hive/DistributionUtil.scala       |   6 +-
 .../spark/load/CarbonLoaderUtilTest.java        | 191 +++++++++++++++++++
 3 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5036af4d/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index b934b1d..55ff6eb 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1207,9 +1207,11 @@ public final class CarbonLoaderUtil {
         List<Distributable> blockLst = outputMap.get(activeNode);
         if (null == blockLst) {
           blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-          outputMap.put(activeNode, blockLst);
         }
         populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+        if (blockLst.size() > 0) {
+          outputMap.put(activeNode, blockLst);
+        }
       }
     } else {
       for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
@@ -1275,21 +1277,15 @@ public final class CarbonLoaderUtil {
     // are assigned first
     Collections.sort(multiBlockRelations);
 
-    Set<String> validActiveNodes = new HashSet<String>();
-    // find all the valid active nodes
     for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
       String nodeName = nodeMultiBlockRelation.getNode();
       //assign the block to the node only if the node is active
-      if (null != activeNodes && isActiveExecutor(activeNodes, nodeName)) {
-        validActiveNodes.add(nodeName);
-      }
-    }
-
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
-      String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
-      if (!validActiveNodes.isEmpty() && !validActiveNodes.contains(nodeName)) {
-        continue;
+      String activeExecutor = nodeName;
+      if (null != activeNodes) {
+        activeExecutor = getActiveExecutor(activeNodes, nodeName);
+        if (null == activeExecutor) {
+          continue;
+        }
       }
       // this loop will be for each NODE
       int nodeCapacity = 0;
@@ -1299,14 +1295,14 @@ public final class CarbonLoaderUtil {
         // check if this is already assigned.
         if (uniqueBlocks.contains(block)) {
 
-          if (null == outputMap.get(nodeName)) {
+          if (null == outputMap.get(activeExecutor)) {
             List<Distributable> list =
                 new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(nodeName, list);
+            outputMap.put(activeExecutor, list);
           }
           // assign this block to this node if node has capacity left
           if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(nodeName);
+            List<Distributable> infos = outputMap.get(activeExecutor);
             infos.add(block);
             nodeCapacity++;
             uniqueBlocks.remove(block);
@@ -1326,16 +1322,19 @@ public final class CarbonLoaderUtil {
    * @param nodeName
    * @return returns true if active else false.
    */
-  private static boolean isActiveExecutor(List activeNode, String nodeName) {
+  private static String getActiveExecutor(List activeNode, String nodeName) {
     boolean isActiveNode = activeNode.contains(nodeName);
     if (isActiveNode) {
-      return isActiveNode;
+      return nodeName;
     }
     //if localhost then retrieve the localhost name then do the check
     else if (nodeName.equals("localhost")) {
       try {
         String hostName = InetAddress.getLocalHost().getHostName();
         isActiveNode = activeNode.contains(hostName);
+        if(isActiveNode){
+          return hostName;
+        }
       } catch (UnknownHostException ue) {
         isActiveNode = false;
       }
@@ -1343,11 +1342,14 @@ public final class CarbonLoaderUtil {
       try {
         String hostAddress = InetAddress.getLocalHost().getHostAddress();
         isActiveNode = activeNode.contains(hostAddress);
+        if(isActiveNode){
+          return hostAddress;
+        }
       } catch (UnknownHostException ue) {
         isActiveNode = false;
       }
     }
-    return isActiveNode;
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5036af4d/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index ac32aaf..81abbfb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -130,7 +130,7 @@ object DistributionUtil {
       confExecutors
     } else {nodeMapping.size()}
 
-    var startTime = System.currentTimeMillis();
+    val startTime = System.currentTimeMillis();
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
     var nodes = DistributionUtil.getNodeList(sparkContext)
     var maxTimes = 30;
@@ -139,9 +139,9 @@ object DistributionUtil {
       nodes = DistributionUtil.getNodeList(sparkContext)
       maxTimes = maxTimes - 1;
     }
-    var timDiff = System.currentTimeMillis() - startTime;
+    val timDiff = System.currentTimeMillis() - startTime;
     LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
     LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) *
500)
-    nodes
+    nodes.distinct
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5036af4d/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
b/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
new file mode 100644
index 0000000..f8ec400
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.load;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to test block distribution functionality
+ */
+public class CarbonLoaderUtilTest {
+  List<Distributable> blockInfos = null;
+  int noOfNodesInput = -1;
+  List<String> activeNode = null;
+  Map<String, List<Distributable>> expected = null;
+  Map<String, List<Distributable>> mapOfNodes = null;
+
+  @Test public void nodeBlockMapping() throws Exception {
+
+    // scenario when the 3 nodes and 3 executors
+    initSet1();
+    Map<String, List<Distributable>> mapOfNodes =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+    // node allocation
+    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+    // block allocation
+    boolean isEqual = compareResult(expected, mapOfNodes);
+    Assert.assertTrue("Block Allocation", isEqual);
+
+    // 2 node and 3 executors
+    initSet2();
+    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+    // node allocation
+    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+    // block allocation
+    isEqual = compareResult(expected, mapOfNodes);
+    Assert.assertTrue("Block Allocation", isEqual);
+
+    // 3 data node and 2 executors
+    initSet3();
+    mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+    // node allocation
+    Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+    // block allocation
+    isEqual = compareResult(expected, mapOfNodes);
+    Assert.assertTrue("Block Allocation", isEqual);
+  }
+
+  /**
+   * compares the blocks allocation
+   *
+   * @param expectedResult
+   * @param actualResult
+   * @return
+   */
+  private boolean compareResult(Map<String, List<Distributable>> expectedResult,
+      Map<String, List<Distributable>> actualResult) {
+    expectedResult = sortByListSize(expectedResult);
+    actualResult = sortByListSize(actualResult);
+    List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
+    List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
+    boolean isEqual = expectedList.size() == mapOfNodesList.size();
+    if (isEqual) {
+      for (int i = 0; i < expectedList.size(); i++) {
+        int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
+        int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
+        isEqual = size1 == size2;
+        if (!isEqual) {
+          break;
+        }
+      }
+    }
+    return isEqual;
+  }
+
+  /**
+   * sort by list size
+   *
+   * @param map
+   * @return
+   */
+  private static Map<String, List<Distributable>> sortByListSize(
+      Map<String, List<Distributable>> map) {
+    List<List<Distributable>> list = new LinkedList(map.entrySet());
+    Collections.sort(list, new Comparator() {
+      public int compare(Object obj1, Object obj2) {
+        if (obj1 == null && obj2 == null) {
+          return 0;
+        } else if (obj1 == null) {
+          return 1;
+        } else if (obj2 == null) {
+          return -1;
+        }
+        int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
+        int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
+        return size2 - size1;
+      }
+    });
+
+    Map res = new LinkedHashMap();
+    for (Iterator it = list.iterator(); it.hasNext(); ) {
+      Map.Entry entry = (Map.Entry) it.next();
+      res.put(entry.getKey(), entry.getValue());
+    }
+    return res;
+  }
+
+  void initSet1() {
+    blockInfos = new ArrayList<>();
+    activeNode = new ArrayList<>();
+    activeNode.add("node-7");
+    activeNode.add("node-9");
+    activeNode.add("node-11");
+    String[] location = { "node-7", "node-9", "node-11" };
+    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+    expected = new HashMap<>();
+    expected.put("node-7", blockInfos.subList(0, 2));
+    expected.put("node-9", blockInfos.subList(2, 4));
+    expected.put("node-11", blockInfos.subList(4, 6));
+  }
+
+  void initSet2() {
+    blockInfos = new ArrayList<>();
+    activeNode = new ArrayList<>();
+    activeNode.add("node-7");
+    activeNode.add("node-9");
+    activeNode.add("node-11");
+    String[] location = { "node-7", "node-11" };
+    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+    expected = new HashMap<>();
+    expected.put("node-7", blockInfos.subList(0, 2));
+    expected.put("node-9", blockInfos.subList(2, 4));
+    expected.put("node-11", blockInfos.subList(4, 6));
+  }
+
+  void initSet3() {
+    blockInfos = new ArrayList<>();
+    activeNode = new ArrayList<>();
+    activeNode.add("node-7");
+    activeNode.add("node-11");
+    String[] location = { "node-7", "node-9", "node-11" };
+    blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+    blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+    expected = new HashMap<>();
+    expected.put("node-7", blockInfos.subList(0, 3));
+    expected.put("node-11", blockInfos.subList(3, 6));
+  }
+}
\ No newline at end of file


Mime
View raw message