storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: [STORM-1574][1.x-branch] Better handle backpressure exception & clear dir
Date Tue, 01 Mar 2016 19:17:18 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch f665cf203 -> 3a6e3e47c


[STORM-1574][1.x-branch] Better handle backpressure exception & clear dir


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/32721e84
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/32721e84
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/32721e84

Branch: refs/heads/1.x-branch
Commit: 32721e8470805282620eda75f0b560c895cf4669
Parents: d6cc7a0
Author: zhuol <zhuol@yahoo-inc.com>
Authored: Fri Feb 26 10:07:30 2016 -0600
Committer: zhuol <zhuol@yahoo-inc.com>
Committed: Fri Feb 26 10:07:30 2016 -0600

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/cluster.clj |  7 ++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  6 ++-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  8 ++--
 .../storm/utils/WorkerBackpressureCallback.java |  3 +-
 .../storm/utils/WorkerBackpressureThread.java   | 38 +++++++++++----
 .../utils/WorkerBackpressureThreadTest.java     | 50 ++++++++++++++++++++
 6 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 5aef266..9b01df9 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -81,6 +81,7 @@
   (worker-backpressure! [this storm-id node port info])
   (topology-backpressure [this storm-id callback])
   (setup-backpressure! [this storm-id])
+  (remove-backpressure! [this storm-id])
   (remove-worker-backpressure! [this storm-id node port])
   (activate-storm! [this storm-id storm-base])
   (update-storm! [this storm-id new-elems])
@@ -485,7 +486,7 @@
     
       (topology-backpressure
         [this storm-id callback]
-        "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise
not."
+        "if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise
throttle-off."
         (when callback
           (swap! backpressure-callback assoc storm-id callback))
         (let [path (backpressure-storm-root storm-id)
@@ -496,6 +497,10 @@
         [this storm-id]
         (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
 
+      (remove-backpressure!
+        [this storm-id]
+        (.delete_node cluster-state (backpressure-storm-root storm-id)))
+
       (remove-worker-backpressure!
         [this storm-id node port]
         (.delete_node cluster-state (backpressure-path storm-id node port)))

http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 9376d6e..ec5a624 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1523,7 +1523,8 @@
               (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf
topology)
               (wait-for-desired-code-replication nimbus total-storm-conf storm-id)
               (.setup-heartbeats! storm-cluster-state storm-id)
-              (.setup-backpressure! storm-cluster-state storm-id)
+              (if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
+                (.setup-backpressure! storm-cluster-state storm-id))
               (notify-topology-action-listener nimbus storm-name "submitTopology")
               (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                               TopologyInitialStatus/ACTIVE :active}]
@@ -1546,6 +1547,7 @@
         (mark! nimbus:num-killTopologyWithOpts-calls)
         (check-storm-active! nimbus storm-name true)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
+              storm-id (topology-conf STORM-ID)
               operation "killTopology"]
           (check-authorization! nimbus storm-name topology-conf operation)
           (let [wait-amt (if (.is_set_wait_secs options)
@@ -1553,6 +1555,8 @@
                            )]
             (transition-name! nimbus storm-name [:kill wait-amt] true)
             (notify-topology-action-listener nimbus storm-name operation))
+          (if (topology-conf TOPOLOGY-BACKPRESSURE-ENABLE)
+            (.remove-backpressure! (:storm-cluster-state nimbus) storm-id))
           (add-topology-to-history-log (get-storm-id (:storm-cluster-state nimbus) storm-name)
             nimbus topology-conf)))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index edef2c4..9d9c482 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -146,7 +146,10 @@
         ;; update the worker's backpressure flag to zookeeper only when it has changed
         (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag)
         (when (not= prev-backpressure-flag @(:backpressure worker))
-          (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure
worker)))
+          (try
+            (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure
worker))
+            (catch Exception exc
+              (log-error exc "workerBackpressure update failed when connecting to ZK ...
will retry"))))
         ))))
 
 (defn- mk-disruptor-backpressure-handler [worker]
@@ -669,8 +672,7 @@
                     (.interrupt transfer-thread)
                     (.join transfer-thread)
                     (log-message "Shut down transfer thread")
-                    (.interrupt backpressure-thread)
-                    (.join backpressure-thread)
+                    (.terminate backpressure-thread)
                     (log-message "Shut down backpressure thread")
                     (cancel-timer (:heartbeat-timer worker))
                     (cancel-timer (:refresh-connections-timer worker))

http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
index 0b3e452..36f62a9 100755
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
@@ -21,6 +21,5 @@ package org.apache.storm.utils;
 
 public interface WorkerBackpressureCallback {
 
-    void onEvent(Object obj) throws Exception;
-
+    void onEvent(Object obj);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
index 6271198..f3b5a66 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
@@ -16,21 +16,26 @@
  * limitations under the License.
  */
 
-
 package org.apache.storm.utils;
 
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WorkerBackpressureThread extends Thread {
 
-    Object trigger;
-    Object workerData;
-    WorkerBackpressureCallback callback;
+    private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
+    private Object trigger;
+    private Object workerData;
+    private WorkerBackpressureCallback callback;
+    private volatile boolean running = true;
 
     public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback
callback) {
         this.trigger = trigger;
         this.workerData = workerData;
         this.callback = callback;
+        this.setName("WorkerBackpressureThread");
+        this.setDaemon(true);
+        this.setUncaughtExceptionHandler(new BackpressureUncaughtExceptionHandler());
     }
 
     static public void notifyBackpressureChecker(Object trigger) {
@@ -43,17 +48,32 @@ public class WorkerBackpressureThread extends Thread {
         }
     }
 
+    public void terminate() throws InterruptedException {
+        running = false;
+        interrupt();
+        join();
+    }
+
     public void run() {
-        try {
-            while (true) {
+        while (running) {
+            try {
                 synchronized(trigger) {
                     trigger.wait(100);
                 }
                 callback.onEvent(workerData); // check all executors and update zk backpressure
throttle for the worker if needed
+            } catch (InterruptedException interEx) {
+                // ignored, we are shutting down.
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
     }
 }
 
+class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        // note that exception that happens during connecting to ZK has been ignored in the
callback implementation
+        LOG.error("Received error or exception in WorkerBackpressureThread.. terminating
the worker...", e);
+        Runtime.getRuntime().exit(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/32721e84/storm-core/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
b/storm-core/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
new file mode 100644
index 0000000..b8e1770
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkerBackpressureThreadTest extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThreadTest.class);
+
+    @Test
+    public void testNormalEvent() throws Exception {
+        Object trigger = new Object();
+        AtomicLong workerData = new AtomicLong(0);
+        WorkerBackpressureCallback callback = new WorkerBackpressureCallback() {
+            @Override
+            public void onEvent(Object obj) {
+                ((AtomicLong) obj).getAndDecrement();
+            }
+        };
+        WorkerBackpressureThread workerBackpressureThread = new WorkerBackpressureThread(trigger,
workerData, callback);
+        workerBackpressureThread.start();
+        WorkerBackpressureThread.notifyBackpressureChecker(trigger);
+        long start = System.currentTimeMillis();
+        while (workerData.get() == 0) {
+            assertTrue("Timeout", (System.currentTimeMillis() - start) < 1000);
+            Thread.sleep(100);
+        }
+    }
+}


Mime
View raw message