storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-3141: Fix NPE in WorkerState.transferLocalBatch, and refactor BackpressureTracker to get rid of placeholder JCQueue
Date Tue, 10 Jul 2018 20:28:26 GMT
Repository: storm
Updated Branches:
  refs/heads/master 2d7c7d316 -> 07c795af5


STORM-3141: Fix NPE in WorkerState.transferLocalBatch, and refactor BackpressureTracker to
get rid of placeholder JCQueue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16132068
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16132068
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16132068

Branch: refs/heads/master
Commit: 1613206802ffa00f1b6b9ab56a741e86aeff6a5e
Parents: 26d2f95
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Mon Jul 2 21:42:54 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Mon Jul 2 21:56:27 2018 +0200

----------------------------------------------------------------------
 .../daemon/worker/BackPressureTracker.java      | 63 ++++++++++++--------
 .../apache/storm/daemon/worker/WorkerState.java | 31 ++++------
 .../java/org/apache/storm/MessagingTest.java    | 34 +++++++++++
 3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index bc396aa..8d96447 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -22,39 +22,35 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.Constants;
 import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.utils.JCQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.stream.Collectors;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
 
 /***
- *   Tracks the BackPressure status using a Map<TaskId, JCQueue>.
- *   Special value NONE, is used to indicate that the task is not under BackPressure
- *   ConcurrentHashMap does not allow storing null values, so we use the special value NONE
instead.
+ *   Tracks the BackPressure status.
  */
 public class BackPressureTracker {
     static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
-    private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
-                                                    "none", Constants.SYSTEM_COMPONENT_ID,
-1, 0) {
-    };
-    private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); //
updates are more frequent than iteration
+    private final Map<Integer, BackpressureState> tasks;
     private final String workerId;
 
-    public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+    public BackPressureTracker(String workerId, Map<Integer, JCQueue> localTasksToQueues)
{
         this.workerId = workerId;
-        for (Integer taskId : allLocalTasks) {
-            if (taskId != SYSTEM_TASK_ID) {
-                tasks.put(taskId, NONE);  // all tasks are considered to be not under BP
initially
-            }
-        }
+        this.tasks = localTasksToQueues.entrySet().stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey(),
+                entry -> new BackpressureState(entry.getValue())));
     }
 
     private void recordNoBackPressure(Integer taskId) {
-        tasks.put(taskId, NONE);
+        tasks.get(taskId).backpressure.set(false);
     }
 
     /***
@@ -62,16 +58,17 @@ public class BackPressureTracker {
      * This is called by transferLocalBatch() on NettyWorker thread
      * @return true if an update was recorded, false if taskId is already under BP
      */
-    public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
-        return tasks.put(taskId, recvQ) == NONE;
+    public boolean recordBackPressure(Integer taskId) {
+        return tasks.get(taskId).backpressure.getAndSet(true);
     }
 
     // returns true if there was a change in the BP situation
     public boolean refreshBpTaskList() {
         boolean changed = false;
         LOG.debug("Running Back Pressure status change check");
-        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
-            if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) {
+        for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+            BackpressureState state = entry.getValue();
+            if (state.backpressure.get() && state.queue.isEmptyOverflow()) {
                 recordNoBackPressure(entry.getKey());
                 changed = true;
             }
@@ -83,9 +80,9 @@ public class BackPressureTracker {
         ArrayList<Integer> bpTasks = new ArrayList<>(tasks.size());
         ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
 
-        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
-            JCQueue q = entry.getValue();
-            if (q != NONE) {
+        for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+            boolean backpressure = entry.getValue().backpressure.get();
+            if (backpressure) {
                 bpTasks.add(entry.getKey());
             } else {
                 nonBpTasks.add(entry.getKey());
@@ -93,4 +90,22 @@ public class BackPressureTracker {
         }
         return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
     }
+    
+    private static class BackpressureState {
+        private final JCQueue queue;
+        //No task is under backpressure initially
+        private final AtomicBoolean backpressure = new AtomicBoolean(false);
+
+        public BackpressureState(JCQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public String toString() {
+            return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+                .append(queue)
+                .append(backpressure)
+                .toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index ab814da..9510bc0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -114,11 +114,9 @@ public class WorkerState {
     final ReentrantReadWriteLock endpointSocketLock;
     final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
     final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
-    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
     // executor id is in form [start_task_id end_task_id]
-    // short executor id is start_task_id
-    final Map<Integer, JCQueue> shortExecutorReceiveQueueMap;
-    final Map<Integer, Integer> taskToShortExecutor;
+    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
+    final Map<Integer, JCQueue> taskToExecutorQueue;
     final Runnable suicideCallback;
     final Utils.UptimeComputer uptime;
     final Map<String, Object> defaultSharedResources;
@@ -166,12 +164,15 @@ public class WorkerState {
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
         this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors);
-        this.shortExecutorReceiveQueueMap = new HashMap<>();
         this.localTaskIds = new ArrayList<>();
+        this.taskToExecutorQueue = new HashMap<>();
         this.blobToLastKnownVersion = new ConcurrentHashMap<>();
         for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet())
{
-            this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
-            this.localTaskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+            List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
+            for (Integer taskId : taskIds) {
+                this.taskToExecutorQueue.put(taskId, entry.getValue());
+            }
+            this.localTaskIds.addAll(taskIds);
         }
         Collections.sort(localTaskIds);
         this.topologyConf = topologyConf;
@@ -192,12 +193,6 @@ public class WorkerState {
         this.endpointSocketLock = new ReentrantReadWriteLock();
         this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
         this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
-        this.taskToShortExecutor = new HashMap<>();
-        for (List<Long> executor : this.localExecutors) {
-            for (Integer task : StormCommon.executorIdToTasks(executor)) {
-                taskToShortExecutor.put(task, executor.get(0).intValue());
-            }
-        }
         this.suicideCallback = Utils.mkSuicideFn();
         this.uptime = Utils.makeUptimeComputer();
         this.defaultSharedResources = makeDefaultResources();
@@ -212,7 +207,7 @@ public class WorkerState {
         }
         int maxTaskId = getMaxTaskId(componentToSortedTasks);
         this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
-        this.bpTracker = new BackPressureTracker(workerId, localTaskIds);
+        this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
         this.deserializedWorkerHooks = deserializeWorkerHooks();
     }
 
@@ -323,10 +318,6 @@ public class WorkerState {
         return executorReceiveQueueMap;
     }
 
-    public Map<Integer, JCQueue> getShortExecutorReceiveQueueMap() {
-        return shortExecutorReceiveQueueMap;
-    }
-
     public Runnable getSuicideCallback() {
         return suicideCallback;
     }
@@ -531,7 +522,7 @@ public class WorkerState {
 
         for (int i = 0; i < tupleBatch.size(); i++) {
             AddressedTuple tuple = tupleBatch.get(i);
-            JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+            JCQueue queue = taskToExecutorQueue.get(tuple.dest);
 
             // 1- try adding to main queue if its overflow is not empty
             if (queue.isEmptyOverflow()) {
@@ -542,7 +533,7 @@ public class WorkerState {
 
             // 2- BP detected (i.e MainQ is full). So try adding to overflow
             int currOverflowCount = queue.getOverflowCount();
-            if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+            if (bpTracker.recordBackPressure(tuple.dest)) {
                 receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
                 lastOverflowCount = currOverflowCount;
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-server/src/test/java/org/apache/storm/MessagingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
index 18cfc37..74b7388 100644
--- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java
+++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
@@ -58,4 +58,38 @@ public class MessagingTest {
             Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
         }
     }
+    
+    @Test
+    public void testRemoteTransportWithManyTasksInReceivingExecutor() throws Exception {
+        //STORM-3141 regression test
+        //Verify that remote worker can handle many tasks in one executor
+        Config topoConf = new Config();
+        topoConf.put(Config.TOPOLOGY_WORKERS, 2);
+        topoConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
+
+        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()
+                                                               .withSupervisors(1).withPortsPerSupervisor(2)
+                                                               .withDaemonConf(topoConf).build())
{
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestWordSpout(true), 1);
+            builder.setBolt("2", new TestGlobalCount(), 1)
+                .setNumTasks(10)
+                .shuffleGrouping("1");
+            StormTopology stormTopology = builder.createTopology();
+
+            List<FixedTuple> fixedTuples = new ArrayList<>();
+            for (int i = 0; i < 12; i++) {
+                fixedTuples.add(new FixedTuple(Collections.singletonList("a")));
+                fixedTuples.add(new FixedTuple(Collections.singletonList("b")));
+            }
+            Map<String, List<FixedTuple>> data = new HashMap<>();
+            data.put("1", fixedTuples);
+            MockedSources mockedSources = new MockedSources(data);
+            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
+            completeTopologyParam.setMockedSources(mockedSources);
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster,
stormTopology, completeTopologyParam);
+            Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
+        }
+    }
 }


Mime
View raw message