storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/7] storm git commit: STORM-2175: support at least once shutdown in worker
Date Thu, 03 Nov 2016 00:20:58 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 1d7aeaaee -> 109448005


STORM-2175: support at least once shutdown in worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ec871d1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ec871d1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ec871d1b

Branch: refs/heads/1.x-branch
Commit: ec871d1b9f6ac9f113edbe06eaa32682bd95d5dd
Parents: 17218ea
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Fri Oct 28 15:56:19 2016 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Fri Oct 28 15:56:19 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/timer.clj         | 10 +++++-----
 .../src/jvm/org/apache/storm/ProcessSimulator.java    | 14 +++++++++++++-
 .../jvm/org/apache/storm/utils/DisruptorQueue.java    | 10 ++++++----
 3 files changed, 24 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ec871d1b/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
index 0d8839e..6ea2029 100644
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -117,11 +117,11 @@
 
 (defn cancel-timer
   [timer]
-  (check-active! timer)
-  (locking (:lock timer)
-    (reset! (:active timer) false)
-    (.interrupt (:timer-thread timer)))
-  (.acquire (:cancel-notifier timer)))
+  (when @(:active timer)
+    (locking (:lock timer)
+      (reset! (:active timer) false)
+      (.interrupt (:timer-thread timer)))
+    (.acquire (:cancel-notifier timer))))
 
 (defn timer-waiting?
   [timer]

http://git-wip-us.apache.org/repos/asf/storm/blob/ec871d1b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
index 202df12..f8de869 100644
--- a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
+++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm;
 import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.utils.Utils;
 
 import java.util.Collection;
 import java.util.Set;
@@ -76,7 +77,18 @@ public class ProcessSimulator {
     public static void killAllProcesses() {
         Set<String> pids = processMap.keySet();
         for (String pid : pids) {
-            killProcess(pid);
+            try {
+                killProcess(pid);
+            } catch (Exception e) {
+                if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
+                    LOG.warn("process {} not killed (Ignoring InterruptedException)", pid,
e);
+                } else if (e instanceof RuntimeException){
+                    throw e;
+                } else {
+                    //TODO once everything is in java this should not be possible any more
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ec871d1b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index ef6f65b..fbae1d1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -107,10 +107,12 @@ public class DisruptorQueue implements IStatefulObject {
 
         public synchronized void stop(Flusher flusher, long flushInterval) {
             ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
-            pending.remove(flusher);
-            if (pending.size() == 0) {
-                _pendingFlush.remove(flushInterval);
-                _tt.remove(flushInterval).cancel();
+            if (pending != null) {
+                pending.remove(flusher);
+                if (pending.size() == 0) {
+                    _pendingFlush.remove(flushInterval);
+                    _tt.remove(flushInterval).cancel();
+                }
             }
         }
     }


Mime
View raw message