storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/4] storm git commit: STORM-3148: Fix backpressure issue with system bolt
Date Fri, 13 Jul 2018 23:03:19 GMT
STORM-3148:  Fix backpressure issue with system bolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63017568
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63017568
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63017568

Branch: refs/heads/master
Commit: 63017568eb543571804d0475c975a9c5382465f9
Parents: 911a0d7
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Thu Jul 12 16:49:04 2018 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Thu Jul 12 16:49:04 2018 -0500

----------------------------------------------------------------------
 .../storm/daemon/worker/BackPressureTracker.java       | 13 ++++++++-----
 .../storm/messaging/netty/StormClientHandler.java      | 12 ++++++++++--
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index a4e87ba..7e98658 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -81,11 +81,14 @@ public class BackPressureTracker {
         ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
 
         for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
-            boolean backpressure = entry.getValue().backpressure.get();
-            if (backpressure) {
-                bpTasks.add(entry.getKey());
-            } else {
-                nonBpTasks.add(entry.getKey());
+            //System bolt is not a part of backpressure.
+            if (entry.getKey() >= 0) {
+                boolean backpressure = entry.getValue().backpressure.get();
+                if (backpressure) {
+                    bpTasks.add(entry.getKey());
+                } else {
+                    nonBpTasks.add(entry.getKey());
+                }
             }
         }
         return new BackPressureStatus(workerId, bpTasks, nonBpTasks);

http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 4c38344..a903264 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -47,12 +47,20 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
             BackPressureStatus status = (BackPressureStatus) message;
             if (status.bpTasks != null) {
                 for (Integer bpTask : status.bpTasks) {
-                    remoteBpStatus[bpTask].set(true);
+                    try {
+                        remoteBpStatus[bpTask].set(true);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        LOG.error("BP index out of bounds {}", e);
+                    }
                 }
             }
             if (status.nonBpTasks != null) {
                 for (Integer bpTask : status.nonBpTasks) {
-                    remoteBpStatus[bpTask].set(false);
+                    try {
+                        remoteBpStatus[bpTask].set(false);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        LOG.error("BP index out of bounds {}", e);
+                    }
                 }
             }
             LOG.debug("Received BackPressure status update : {}", status);


Mime
View raw message