storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [5/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping
Date Thu, 07 Sep 2017 18:09:23 GMT
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* change everything to Array and pre-allocate all
* use static length for choices
* prepare backup array for choices pre-allocated and swap to avoid allocating arrays


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08038b6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08038b6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08038b6d

Branch: refs/heads/master
Commit: 08038b6d65462788796348e40adbacfa6d2f9467
Parents: a1fd83a
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Wed Aug 9 07:18:21 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Wed Aug 9 13:23:50 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 103 ++++++++++++-------
 .../grouping/LoadAwareShuffleGroupingTest.java  |  27 ++---
 2 files changed, 77 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 20437a1..080c48d 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -27,18 +27,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable
{
-    private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+    private static final int CAPACITY = 1000;
 
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
-    private ArrayList<List<Integer>> choices;
+    private int[] loads;
+    private int[] unassigned;
+    private int[] choices;
+    private int[] prepareChoices;
     private AtomicInteger current;
-    private int actualCapacity = 0;
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer>
targetTasks) {
@@ -51,18 +54,22 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
             targets[i] = targetTasks.get(i);
         }
 
-        actualCapacity = targets.length * CAPACITY_TASK_MULTIPLICATION;
-
         // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
-        choices = new ArrayList<>(actualCapacity);
-        int index = 0;
-        while (index < actualCapacity) {
-            choices.add(rets[index % targets.length]);
-            index++;
+        choices = new int[CAPACITY];
+
+        for (int i = 0 ; i < CAPACITY ; i++) {
+            choices[i] = i % rets.length;
         }
 
-        Collections.shuffle(choices, random);
+        shuffleArray(choices);
         current = new AtomicInteger(0);
+
+        // allocate another array to be switched
+        prepareChoices = new int[CAPACITY];
+
+        // allocating only once
+        loads = new int[targets.length];
+        unassigned = new int[targets.length];
     }
 
     @Override
@@ -78,14 +85,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping
load) {
         int rightNow;
-        int size = choices.size();
         while (true) {
             rightNow = current.incrementAndGet();
-            if (rightNow < size) {
-                return choices.get(rightNow);
-            } else if (rightNow == size) {
+            if (rightNow < CAPACITY) {
+                return rets[choices[rightNow]];
+            } else if (rightNow == CAPACITY) {
                 current.set(0);
-                return choices.get(0);
+                return rets[choices[0]];
             }
             //race condition with another thread, and we lost
             // try again
@@ -94,45 +100,70 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     private void updateRing(LoadMapping load) {
         int localTotal = 0;
-        int[] loads = new int[targets.length];
         for (int i = 0 ; i < targets.length; i++) {
             int val = (int)(101 - (load.get(targets[i]) * 100));
             loads[i] = val;
             localTotal += val;
         }
 
-        // allocating enough memory doesn't hurt much, so assign aggressively
-        // we will cut out if actual size becomes bigger than actualCapacity
-        ArrayList<List<Integer>> newChoices = new ArrayList<>(actualCapacity
+ targets.length);
+        int currentIdx = 0;
+        int unassignedIdx = 0;
         for (int i = 0 ; i < loads.length ; i++) {
+            if (currentIdx == CAPACITY) {
+                break;
+            }
+
             int loadForTask = loads[i];
-            int amount = loadForTask * actualCapacity / localTotal;
+            int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
             // assign at least one for task
             if (amount == 0) {
-                amount = 1;
+                unassigned[unassignedIdx++] = i;
             }
             for (int j = 0; j < amount; j++) {
-                newChoices.add(rets[i]);
+                if (currentIdx == CAPACITY) {
+                    break;
+                }
+
+                prepareChoices[currentIdx++] = i;
             }
         }
 
-        Collections.shuffle(newChoices, random);
-
-        // make sure length of newChoices is same as actualCapacity, like current choices
-        // this ensures safety when requests and update occurs concurrently
-        if (newChoices.size() > actualCapacity) {
-            newChoices.subList(actualCapacity, newChoices.size()).clear();
-        } else if (newChoices.size() < actualCapacity) {
-            int remaining = actualCapacity - newChoices.size();
-            while (remaining > 0) {
-                newChoices.add(newChoices.get(remaining % newChoices.size()));
-                remaining--;
+        if (currentIdx < CAPACITY) {
+            // if there're some rooms, give unassigned tasks a chance to be included
+            // this should be really small amount, so just add them sequentially
+            if (unassignedIdx > 0) {
+                for (int i = currentIdx ; i < CAPACITY ; i++) {
+                    prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
+                }
+            } else {
+                // just pick random
+                for (int i = currentIdx ; i < CAPACITY ; i++) {
+                    prepareChoices[i] = random.nextInt(loads.length);
+                }
             }
         }
 
-        assert newChoices.size() == actualCapacity;
+        shuffleArray(prepareChoices);
+
+        // swapping two arrays
+        int[] tempForSwap = choices;
+        choices = prepareChoices;
+        prepareChoices = tempForSwap;
 
-        choices = newChoices;
         current.set(0);
     }
+
+    private void shuffleArray(int[] arr) {
+        int size = arr.length;
+        for (int i = size; i > 1; i--) {
+            swap(arr, i - 1, random.nextInt(i));
+        }
+    }
+
+    private void swap(int[] arr, int i, int j) {
+        int tmp = arr[i];
+        arr[i] = arr[j];
+        arr[j] = tmp;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index ed08a05..2d90d66 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -71,12 +71,14 @@ public class LoadAwareShuffleGroupingTest {
         // distribution should be even for all nodes when loads are even
         int desiredTaskCountPerTask = 5000;
         int totalEmits = numTasks * desiredTaskCountPerTask;
+        int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
 
         int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks,
loadMapping);
 
         for (int i = 0; i < numTasks; i++) {
-            assertEquals("Distribution should be even for all nodes", desiredTaskCountPerTask,
-                taskCounts[i]);
+            assertTrue("Distribution should be even for all nodes with small delta",
+                taskCounts[i] >= minPrCount && taskCounts[i] <= maxPrCount);
         }
     }
 
@@ -106,6 +108,7 @@ public class LoadAwareShuffleGroupingTest {
         // distribution should be exactly even
         final int groupingExecutionsPerThread = numTasks * 5000;
         final int numThreads = 10;
+        int totalEmits = groupingExecutionsPerThread * numThreads;
 
         List<Callable<int[]>> threadTasks = Lists.newArrayList();
         for (int x = 0; x < numThreads; x++) {
@@ -149,22 +152,12 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
-        int[] loads = new int[numTasks];
-        int localTotal = 0;
-        List<Double> loadRate = new ArrayList<>();
-        for (int i = 0; i < numTasks; i++) {
-            int val = (int)(101 - (loadMapping.get(i) * 100));
-            loads[i] = val;
-            localTotal += val;
-        }
-
-        for (int i = 0; i < numTasks; i++) {
-            loadRate.add(loads[i] * 1.0 / localTotal);
-        }
+        int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
 
         for (int i = 0; i < numTasks; i++) {
-            int expected = numThreads * groupingExecutionsPerThread / numTasks;
-            assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
+            assertTrue("Distribution should be even for all nodes with small delta",
+                taskIdTotals[i] >= minPrCount && taskIdTotals[i] <= maxPrCount);
         }
     }
 
@@ -441,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         current = System.currentTimeMillis();
-        for (int i = 1; i <= 2_000_000_000 ; i++) {
+        for (int i = 1; i <= 2_000_000_000; i++) {
             grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
         }
 


Mime
View raw message