storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/6] storm git commit: remove array shuffle from ShuffleGrouping when counter resets to zero
Date Wed, 23 Nov 2016 19:39:49 GMT
Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch 916250f5f -> 5938f2cba


remove array shuffle from ShuffleGrouping when counter resets to zero


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/980af808
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/980af808
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/980af808

Branch: refs/heads/1.0.x-branch
Commit: 980af8083aaf185fad3db476dd765abe4ea9cda4
Parents: 55f5567
Author: Kevin Peek <kpeek@salesforce.com>
Authored: Mon Nov 21 13:53:33 2016 -0500
Committer: Kevin Peek <kpeek@salesforce.com>
Committed: Mon Nov 21 13:53:33 2016 -0500

----------------------------------------------------------------------
 .../apache/storm/grouping/ShuffleGrouping.java  |  14 +--
 .../storm/grouping/ShuffleGroupingTest.java     | 124 +++++++++++++++++++
 2 files changed, 128 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/980af808/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java b/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
index e8b1d71..10bfb2a 100644
--- a/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
+++ b/storm-core/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
@@ -17,17 +17,13 @@
  */
 package org.apache.storm.grouping;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
     private Random random;
     private ArrayList<List<Integer>> choices;
@@ -54,8 +50,6 @@ public class ShuffleGrouping implements CustomStreamGrouping, Serializable
{
                 return choices.get(rightNow);
             } else if (rightNow == size) {
                 current.set(0);
-                //This should be thread safe so long as ArrayList does not have any internal
state that can be messed up by multi-treaded access.
-                Collections.shuffle(choices, random);
                 return choices.get(0);
             }
             //race condition with another thread, and we lost

http://git-wip-us.apache.org/repos/asf/storm/blob/980af808/storm-core/test/jvm/org/apache/storm/grouping/ShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/grouping/ShuffleGroupingTest.java b/storm-core/test/jvm/org/apache/storm/grouping/ShuffleGroupingTest.java
new file mode 100644
index 0000000..2a0b87f
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/grouping/ShuffleGroupingTest.java
@@ -0,0 +1,124 @@
+package org.apache.storm.grouping;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+public class ShuffleGroupingTest {
+
+    /**
+     * Tests that we round robbin correctly using ShuffleGrouping implementation.
+     * */
+    @Test
+    public void testShuffleGrouping() {
+        final int numTasks = 6;
+
+        final ShuffleGrouping grouper = new ShuffleGrouping();
+
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
+
+        // Define our taskIds
+        final List<Integer> availableTaskIds = Lists.newArrayList();
+        for (int i = 0; i < numTasks; i++) {
+            availableTaskIds.add(i);
+        }
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
+
+        // Keep track of how many times we see each taskId
+        int[] taskCounts = new int[numTasks];
+        for (int i = 1; i <= 30000; i++) {
+            List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
+
+            // Validate a single task id return
+            assertNotNull("Not null taskId list returned", taskIds);
+            assertEquals("Single task Id returned", 1, taskIds.size());
+
+            int taskId = taskIds.get(0);
+
+            assertTrue("TaskId should exist", taskId >= 0 && taskId < numTasks);
+            taskCounts[taskId]++;
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            assertEquals("Distribution should be even for all nodes", 5000, taskCounts[i]);
+        }
+    }
+
+    /**
+     * Tests that we round robbin correctly with multiple threads using ShuffleGrouping implementation.
+     */
+    @Test
+    public void testShuffleGroupMultiThreaded() throws InterruptedException, ExecutionException
{
+        final int numTasks = 6;
+        final int groupingExecutionsPerThread = 30000;
+        final int numThreads = 10;
+
+        final CustomStreamGrouping grouper = new ShuffleGrouping();
+
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
+        // Define our taskIds - the test expects these to be incrementing by one up from
zero
+        final List<Integer> availableTaskIds = Lists.newArrayList();
+        for (int i = 0; i < numTasks; i++) {
+            availableTaskIds.add(i);
+        }
+
+        final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+        // Call prepare with our available taskIds
+        grouper.prepare(context, null, availableTaskIds);
+
+        List<Callable<int[]>> threadTasks = Lists.newArrayList();
+        for (int x=0; x < numThreads; x++) {
+            Callable<int[]> threadTask = new Callable<int[]>() {
+                @Override
+                public int[] call() throws Exception {
+                    int[] taskCounts = new int[availableTaskIds.size()];
+                    for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
+
+                        // Validate a single task id return
+                        assertNotNull("Not null taskId list returned", taskIds);
+                        assertEquals("Single task Id returned", 1, taskIds.size());
+
+                        int taskId = taskIds.get(0);
+
+                        assertTrue("TaskId should exist", taskId >= 0 && taskId
< availableTaskIds.size());
+                        taskCounts[taskId]++;
+                    }
+                    return taskCounts;
+                }
+            };
+
+            // Add to our collection.
+            threadTasks.add(threadTask);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+        List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
+
+        // Wait for all tasks to complete
+        int[] taskIdTotals = new int[numTasks];
+        for (Future taskResult: taskResults) {
+            while (!taskResult.isDone()) {
+                Thread.sleep(1000);
+            }
+            int[] taskDistributions = (int[]) taskResult.get();
+            for (int i = 0; i < taskDistributions.length; i++) {
+                taskIdTotals[i] += taskDistributions[i];
+            }
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            assertEquals(numThreads * groupingExecutionsPerThread / numTasks, taskIdTotals[i]);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message