storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/2] storm git commit: Merge branch 'STORM-2175-1.x' of https://github.com/revans2/incubator-storm into STORM-2175-1.x
Date Thu, 03 Nov 2016 00:15:38 GMT
Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch 5e7476285 -> c842ae28e


Merge branch 'STORM-2175-1.x' of https://github.com/revans2/incubator-storm into STORM-2175-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e44f7987
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e44f7987
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e44f7987

Branch: refs/heads/1.0.x-branch
Commit: e44f798739c7e89a9003b77f6b5f9c3e5d3d6369
Parents: 5e74762
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Nov 3 09:08:04 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Nov 3 09:12:57 2016 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  5 +-
 storm-core/src/clj/org/apache/storm/timer.clj   | 10 +--
 .../jvm/org/apache/storm/ProcessSimulator.java  | 14 ++++-
 .../org/apache/storm/command/KillWorkers.java   |  4 +-
 .../daemon/supervisor/ReadClusterState.java     | 65 ++++++++++++++++----
 .../storm/daemon/supervisor/Supervisor.java     |  4 +-
 .../apache/storm/daemon/supervisor/UniFunc.java | 22 +++++++
 .../supervisor/timer/SupervisorHealthCheck.java |  2 +-
 .../org/apache/storm/utils/DisruptorQueue.java  | 10 +--
 9 files changed, 105 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index f7cb49e..93e5ef2 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -23,7 +23,7 @@
              [executor :as executor]])
   (:import [org.apache.commons.io FileUtils]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils])
+           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils
ReadClusterState])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -201,8 +201,7 @@
   (.close (:state cluster-map))
   (.disconnect (:storm-cluster-state cluster-map))
   (doseq [s @(:supervisors cluster-map)]
-    (.shutdownAllWorkers s)
-    ;; race condition here? will it launch the workers again?
+    (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
     (.close s))
   (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/clj/org/apache/storm/timer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/timer.clj b/storm-core/src/clj/org/apache/storm/timer.clj
index 0d8839e..6ea2029 100644
--- a/storm-core/src/clj/org/apache/storm/timer.clj
+++ b/storm-core/src/clj/org/apache/storm/timer.clj
@@ -117,11 +117,11 @@
 
 (defn cancel-timer
   [timer]
-  (check-active! timer)
-  (locking (:lock timer)
-    (reset! (:active timer) false)
-    (.interrupt (:timer-thread timer)))
-  (.acquire (:cancel-notifier timer)))
+  (when @(:active timer)
+    (locking (:lock timer)
+      (reset! (:active timer) false)
+      (.interrupt (:timer-thread timer)))
+    (.acquire (:cancel-notifier timer))))
 
 (defn timer-waiting?
   [timer]

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
index 202df12..f8de869 100644
--- a/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
+++ b/storm-core/src/jvm/org/apache/storm/ProcessSimulator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm;
 import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.utils.Utils;
 
 import java.util.Collection;
 import java.util.Set;
@@ -76,7 +77,18 @@ public class ProcessSimulator {
     public static void killAllProcesses() {
         Set<String> pids = processMap.keySet();
         for (String pid : pids) {
-            killProcess(pid);
+            try {
+                killProcess(pid);
+            } catch (Exception e) {
+                if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
+                    LOG.warn("process {} not killed (Ignoring InterruptedException)", pid,
e);
+                } else if (e instanceof RuntimeException){
+                    throw e;
+                } else {
+                    //TODO once everything is in java this should not be possible any more
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index 1fcd81b..b9f8c14 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -21,8 +21,6 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.storm.Config;
-import org.apache.storm.utils.Utils;
-
 import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.utils.ConfigUtils;
@@ -32,7 +30,7 @@ public class KillWorkers {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
         try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor()))
{
-            supervisor.shutdownAllWorkers();
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 40b4a93..0fa60f1 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,6 +44,7 @@ import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -286,30 +287,70 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         return portTasks;
     }
 
-    public synchronized void shutdownAllWorkers() {
+    private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second.  Workers commit
suicide after this
+    private static final long ERROR_MILLIS = 60_000; //1 min.  This really means something
is wrong.  Even on a very slow node
+    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = new UniFunc<Slot>()
{
+        public void call(Slot slot) {
+            throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms to shut
down slot " + slot);
+        }
+    };
+    
+    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = new UniFunc<Slot>()
{
+        public void call(Slot slot) {
+            LOG.warn("It has taken {}ms so far and {} is still not shut down.", WARN_MILLIS,
slot);
+        }
+    };
+    
+    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = new UniFunc<Slot>()
{
+        public void call(Slot slot) throws Exception {
+            LOG.warn("Shutdown of slot {} appreas to be stuck\n{}", slot, Utils.threadDump());
+            DEFAULT_ON_ERROR_TIMEOUT.call(slot);
+        }
+    };
+    
+    public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         for (Slot slot: slots.values()) {
+            LOG.info("Setting {} assignment to null", slot);
             slot.setNewAssignment(null);
         }
-
+        
+        if (onWarnTimeout == null) {
+            onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT;
+        }
+        
+        if (onErrorTimeout == null) {
+            onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT;
+        }
+        
+        long startTime = Time.currentTimeMillis();
+        Exception exp = null;
         for (Slot slot: slots.values()) {
+            LOG.info("Waiting for {} to be EMPTY, currently {}", slot, slot.getMachineState());
             try {
-                int count = 0;
                 while (slot.getMachineState() != MachineState.EMPTY) {
-                    if (count > 10) {
-                        LOG.warn("DONE waiting for {} to finish {}", slot, slot.getMachineState());
-                        break;
+                    long timeSpentMillis = Time.currentTimeMillis() - startTime;
+                    if (timeSpentMillis > ERROR_MILLIS) {
+                        onErrorTimeout.call(slot);
+                    }
+                    
+                    if (timeSpentMillis > WARN_MILLIS) {
+                        onWarnTimeout.call(slot);
                     }
                     if (Time.isSimulating()) {
-                        Time.advanceTime(1000);
-                        Thread.sleep(100);
-                    } else {
-                        Time.sleep(100);
+                        Time.advanceTime(100);
                     }
-                    count++;
+                    Thread.sleep(100);
                 }
             } catch (Exception e) {
                 LOG.error("Error trying to shutdown workers in {}", slot, e);
+                exp = e;
+            }
+        }
+        if (exp != null) {
+            if (exp instanceof RuntimeException) {
+                throw (RuntimeException)exp;
             }
+            throw new RuntimeException(exp);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 1399e8d..5232dc3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -323,9 +323,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
-    public void shutdownAllWorkers() {
+    public void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         if (readState != null) {
-            readState.shutdownAllWorkers();
+            readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
new file mode 100644
index 0000000..9662af5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public interface UniFunc<T> {
+    public void call(T arg) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 0017092..e67cfc7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -35,7 +35,7 @@ public class SupervisorHealthCheck implements Runnable {
         Map<String, Object> conf = supervisor.getConf();
         int healthCode = HealthCheck.healthCheck(conf);
         if (healthCode != 0) {
-            supervisor.shutdownAllWorkers();
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e44f7987/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index ef6f65b..fbae1d1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -107,10 +107,12 @@ public class DisruptorQueue implements IStatefulObject {
 
         public synchronized void stop(Flusher flusher, long flushInterval) {
             ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
-            pending.remove(flusher);
-            if (pending.size() == 0) {
-                _pendingFlush.remove(flushInterval);
-                _tt.remove(flushInterval).cancel();
+            if (pending != null) {
+                pending.remove(flusher);
+                if (pending.size() == 0) {
+                    _pendingFlush.remove(flushInterval);
+                    _tt.remove(flushInterval).cancel();
+                }
             }
         }
     }


Mime
View raw message