storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [5/7] storm git commit: STORM-2175: Allow for longer timeout and different operations on failures
Date Thu, 03 Nov 2016 00:21:02 GMT
STORM-2175: Allow for longer timeout and different operations on failures


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9f981e53
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9f981e53
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9f981e53

Branch: refs/heads/1.x-branch
Commit: 9f981e533f2da6cb387658cc0f84b84a1054883e
Parents: 34d608b
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Tue Nov 1 10:26:06 2016 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Tue Nov 1 10:26:06 2016 -0500

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  5 +--
 .../org/apache/storm/command/KillWorkers.java   |  4 +-
 .../daemon/supervisor/ReadClusterState.java     | 43 ++++++++++++++++----
 .../storm/daemon/supervisor/Supervisor.java     |  4 +-
 .../apache/storm/daemon/supervisor/UniFunc.java | 22 ++++++++++
 .../supervisor/timer/SupervisorHealthCheck.java |  2 +-
 6 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index afb7b61..c061922 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -23,7 +23,7 @@
              [executor :as executor]])
   (:import [org.apache.commons.io FileUtils]
            [org.apache.storm ProcessSimulator]
-           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils])
+           [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils
ReadClusterState])
   (:import [java.io File])
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
@@ -201,8 +201,7 @@
   (.close (:state cluster-map))
   (.disconnect (:storm-cluster-state cluster-map))
   (doseq [s @(:supervisors cluster-map)]
-    (.shutdownAllWorkers s false)
-    ;; race condition here? will it launch the workers again?
+    (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
     (.close s))
   (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))

http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
index 0ce40df..b9f8c14 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillWorkers.java
@@ -21,8 +21,6 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.storm.Config;
-import org.apache.storm.utils.Utils;
-
 import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.utils.ConfigUtils;
@@ -32,7 +30,7 @@ public class KillWorkers {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         conf.put(Config.STORM_LOCAL_DIR, new File((String)conf.get(Config.STORM_LOCAL_DIR)).getCanonicalPath());
         try (Supervisor supervisor = new Supervisor(conf, null, new StandaloneSupervisor()))
{
-            supervisor.shutdownAllWorkers(true);
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 849ba07..0fa60f1 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,6 +44,7 @@ import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -286,26 +287,54 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         return portTasks;
     }
 
-    private static final long WARN_MILLIS = 1_000; //Warn about a shutdown that takes longer
than 1 second (default timeout)
-    private static final long ERROR_MILLIS = 10_000; //Throw an exception if after 10 seconds.
+    private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second.  Workers commit
suicide after this
+    private static final long ERROR_MILLIS = 60_000; //1 min.  This really means something
is wrong.  Even on a very slow node
+    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = new UniFunc<Slot>()
{
+        public void call(Slot slot) {
+            throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms to shut
down slot " + slot);
+        }
+    };
     
-    public synchronized void shutdownAllWorkers(boolean enableErrorOnTimeout) {
+    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = new UniFunc<Slot>()
{
+        public void call(Slot slot) {
+            LOG.warn("It has taken {}ms so far and {} is still not shut down.", WARN_MILLIS,
slot);
+        }
+    };
+    
+    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = new UniFunc<Slot>()
{
+        public void call(Slot slot) throws Exception {
+            LOG.warn("Shutdown of slot {} appreas to be stuck\n{}", slot, Utils.threadDump());
+            DEFAULT_ON_ERROR_TIMEOUT.call(slot);
+        }
+    };
+    
+    public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         for (Slot slot: slots.values()) {
+            LOG.info("Setting {} assignment to null", slot);
             slot.setNewAssignment(null);
         }
         
+        if (onWarnTimeout == null) {
+            onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT;
+        }
+        
+        if (onErrorTimeout == null) {
+            onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT;
+        }
+        
         long startTime = Time.currentTimeMillis();
         Exception exp = null;
         for (Slot slot: slots.values()) {
+            LOG.info("Waiting for {} to be EMPTY, currently {}", slot, slot.getMachineState());
             try {
                 while (slot.getMachineState() != MachineState.EMPTY) {
                     long timeSpentMillis = Time.currentTimeMillis() - startTime;
-                    if (enableErrorOnTimeout && timeSpentMillis > ERROR_MILLIS)
{
-                        throw new IllegalStateException("It took over " + timeSpentMillis
+ "ms to shut down slot " + slot);
+                    if (timeSpentMillis > ERROR_MILLIS) {
+                        onErrorTimeout.call(slot);
                     }
                     
                     if (timeSpentMillis > WARN_MILLIS) {
-                        LOG.warn("It has taken {}ms so far and {} is still not shut down.",
timeSpentMillis, slot);
+                        onWarnTimeout.call(slot);
                     }
                     if (Time.isSimulating()) {
                         Time.advanceTime(100);

http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index 5448116..5232dc3 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -323,9 +323,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
-    public void shutdownAllWorkers(boolean enableErrorOnTimeout) {
+    public void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot>
onErrorTimeout) {
         if (readState != null) {
-            readState.shutdownAllWorkers(enableErrorOnTimeout);
+            readState.shutdownAllWorkers(onWarnTimeout, onErrorTimeout);
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getSharedContext());

http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
new file mode 100644
index 0000000..9662af5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UniFunc.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+public interface UniFunc<T> {
+    public void call(T arg) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9f981e53/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index a23351b..e67cfc7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -35,7 +35,7 @@ public class SupervisorHealthCheck implements Runnable {
         Map<String, Object> conf = supervisor.getConf();
         int healthCode = HealthCheck.healthCheck(conf);
         if (healthCode != 0) {
-            supervisor.shutdownAllWorkers(true);
+            supervisor.shutdownAllWorkers(null, null);
         }
     }
 }


Mime
View raw message