storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/3] storm git commit: Fix bug in BackPressureTracker.recordBackPressure, add some tests
Date Tue, 10 Jul 2018 20:28:27 GMT
Fix bug in BackPressureTracker.recordBackPressure, add some tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f60e943a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f60e943a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f60e943a

Branch: refs/heads/master
Commit: f60e943a388f3e828d10237782c205781c3050bc
Parents: 1613206
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Thu Jul 5 20:20:15 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Thu Jul 5 20:20:15 2018 +0200

----------------------------------------------------------------------
 .../daemon/worker/BackPressureTracker.java      |   2 +-
 .../daemon/worker/BackPressureTrackerTest.java  | 119 +++++++++++++++++++
 2 files changed, 120 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 8d96447..a4e87ba 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -59,7 +59,7 @@ public class BackPressureTracker {
      * @return true if an update was recorded, false if taskId is already under BP
      */
     public boolean recordBackPressure(Integer taskId) {
-        return tasks.get(taskId).backpressure.getAndSet(true);
+        return tasks.get(taskId).backpressure.getAndSet(true) == false;
     }
 
     // returns true if there was a change in the BP situation

http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
new file mode 100644
index 0000000..7e891b5
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.storm.utils.JCQueue;
+import org.junit.Test;
+
+public class BackPressureTrackerTest {
+
+    private static final String WORKER_ID = "worker";
+
+    @Test
+    public void testGetBackpressure() {
+        int taskIdNoBackPressure = 1;
+        JCQueue noBackPressureQueue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
+            Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
+
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+        assertThat(status.bpTasks, is(empty()));
+    }
+
+    @Test
+    public void testSetBackpressure() {
+        int taskIdNoBackPressure = 1;
+        JCQueue noBackPressureQueue = mock(JCQueue.class);
+        int taskIdBackPressure = 2;
+        JCQueue backPressureQueue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskIdNoBackPressure, noBackPressureQueue,
+            taskIdBackPressure, backPressureQueue));
+
+        boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure);
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(true));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+        assertThat(status.bpTasks, contains(taskIdBackPressure));
+    }
+
+    @Test
+    public void testSetBackpressureWithExistingBackpressure() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.recordBackPressure(taskId);
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(false));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.bpTasks, contains(taskId));
+    }
+
+    @Test
+    public void testRefreshBackpressureWithEmptyOverflow() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        when(queue.isEmptyOverflow()).thenReturn(true);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.refreshBpTaskList();
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(true));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskId));
+    }
+
+    @Test
+    public void testRefreshBackPressureWithNonEmptyOverflow() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        when(queue.isEmptyOverflow()).thenReturn(false);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.refreshBpTaskList();
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(false));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.bpTasks, contains(taskId));
+    }
+
+}


Mime
View raw message