storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [4/6] storm git commit: STORM-2175: Allow for longer timeout and different operations on failures
Date Thu, 03 Nov 2016 00:21:10 GMT
STORM-2175: Allow for longer timeout and different operations on failures

Conflicts:
	storm-core/src/clj/org/apache/storm/testing.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a389e93
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a389e93
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a389e93

Branch: refs/heads/master
Commit: 3a389e93aeeadd28e0409584e8a14bd56a7b8f52
Parents: 3e07195
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Tue Nov 1 10:26:06 2016 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Tue Nov 1 10:59:07 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  5 ++-
 .../org/apache/storm/command/KillWorkers.java   |  2 +-
 .../daemon/supervisor/ReadClusterState.java     | 33 +++++++++++++++-----
 .../storm/daemon/supervisor/Supervisor.java     |  4 +--
 .../apache/storm/daemon/supervisor/UniFunc.java | 22 +++++++++++++
 .../supervisor/timer/SupervisorHealthCheck.java |  2 +-
 6 files changed, 54 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 5a03755..5081aa4 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -25,7 +25,7 @@
            [org.apache.storm.utils]
            [org.apache.storm.zookeeper Zookeeper]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils]
+           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils
ReadClusterState]
            [org.apache.storm.executor Executor]
            [java.util.concurrent.atomic AtomicBoolean])
   (:import [java.io File])
@@ -247,8 +247,7 @@
   (.close (:state cluster-map))
   (.disconnect (:storm-cluster-state cluster-map))
   (doseq [s @(:supervisors cluster-map)]
-    (.shutdownAllWorkers s false)
-    ;; race condition here? will it launch the workers again?
+    (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
     (.close s))
   (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))

http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index 9ca4da2..b9f8c14 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -30,7 +30,7 @@ public class KillWorkers {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
         try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor()))
{
-            supervisor.shutdownAllWorkers(true);
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 6862340..222f3b0 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,6 +44,7 @@ import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -286,26 +287,44 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         return portTasks;
     }
 
-    private static final long WARN_MILLIS = 1_000; //Warn about a shutdown that takes longer
than 1 second (default timeout)
-    private static final long ERROR_MILLIS = 10_000; //Throw an exception if after 10 seconds.
+    private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second.  Workers commit
suicide after this
+    private static final long ERROR_MILLIS = 60_000; //1 min.  This really means something
is wrong.  Even on a very slow node
+    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = (slot) -> {
+        throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms to shut down
slot " + slot);
+    };
+    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = (slot) -> LOG.warn("It
has taken {}ms so far and {} is still not shut down.", WARN_MILLIS, slot);
+    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = (slot) -> {
+        LOG.warn("Shutdown of slot {} appreas to be stuck\n{}", slot, Utils.threadDump());
+        DEFAULT_ON_ERROR_TIMEOUT.call(slot);
+    };
     
-    public synchronized void shutdownAllWorkers(boolean enableErrorOnTimeout) {
+    public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         for (Slot slot: slots.values()) {
+            LOG.info("Setting {} assignment to null", slot);
             slot.setNewAssignment(null);
         }
         
+        if (onWarnTimeout == null) {
+            onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT;
+        }
+        
+        if (onErrorTimeout == null) {
+            onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT;
+        }
+        
         long startTime = Time.currentTimeMillis();
         Exception exp = null;
         for (Slot slot: slots.values()) {
+            LOG.info("Waiting for {} to be EMPTY, currently {}", slot, slot.getMachineState());
             try {
                 while (slot.getMachineState() != MachineState.EMPTY) {
                     long timeSpentMillis = Time.currentTimeMillis() - startTime;
-                    if (enableErrorOnTimeout && timeSpentMillis > ERROR_MILLIS)
{
-                        throw new IllegalStateException("It took over " + timeSpentMillis
+ "ms to shut down slot " + slot);
+                    if (timeSpentMillis > ERROR_MILLIS) {
+                        onErrorTimeout.call(slot);
                     }
                     
                     if (timeSpentMillis > WARN_MILLIS) {
-                        LOG.warn("It has taken {}ms so far and {} is still not shut down.",
timeSpentMillis, slot);
+                        onWarnTimeout.call(slot);
                     }
                     if (Time.isSimulating()) {
                         Time.advanceTime(100);

http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index e1e269c..ec791ea 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -318,9 +318,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
-    public void shutdownAllWorkers(boolean enableErrorOnTimeout) {
+    public void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         if (readState != null) {
-            readState.shutdownAllWorkers(enableErrorOnTimeout);
+            readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());

http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
new file mode 100644
index 0000000..9662af5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public interface UniFunc<T> {
+    public void call(T arg) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a389e93/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index a23351b..e67cfc7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -35,7 +35,7 @@ public class SupervisorHealthCheck implements Runnable {
         Map<String, Object> conf = supervisor.getConf();
         int healthCode = HealthCheck.healthCheck(conf);
         if (healthCode != 0) {
-            supervisor.shutdownAllWorkers(true);
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }


Mime
View raw message