storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/10] storm git commit: STORM-1155. Supervisor recurring health checks
Date Wed, 11 Nov 2015 21:24:06 GMT
Repository: storm
Updated Branches:
  refs/heads/master 2b6884b30 -> 74b0a4f68


STORM-1155.  Supervisor recurring health checks


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5bd5cf95
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5bd5cf95
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5bd5cf95

Branch: refs/heads/master
Commit: 5bd5cf958943007dfe1742f6d4adda8f2a0b75ee
Parents: 8ba776b
Author: Thomas Graves <tgraves@decadefade.corp.ne1.yahoo.com>
Authored: Mon Nov 2 22:12:34 2015 +0000
Committer: Thomas Graves <tgraves@decadefade.corp.ne1.yahoo.com>
Committed: Mon Nov 2 22:12:34 2015 +0000

----------------------------------------------------------------------
 bin/storm.py                                    | 13 ++-
 conf/defaults.yaml                              |  2 +
 .../documentation/Setting-up-a-Storm-cluster.md | 16 ++++
 .../clj/backtype/storm/command/healthcheck.clj  | 88 ++++++++++++++++++++
 .../clj/backtype/storm/daemon/supervisor.clj    | 13 ++-
 storm-core/src/jvm/backtype/storm/Config.java   | 13 +++
 6 files changed, 143 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 1b9617e..4798ade 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -381,6 +381,17 @@ def get_errors(*args):
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
 
+def healthcheck(*args):
+    """Syntax: [storm node-health-check]
+
+    Run health checks on the local supervisor.
+    """
+    exec_storm_class(
+        "backtype.storm.command.healthcheck",
+        args=args,
+        jvmtype="-client",
+        extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
+
 def kill_workers(*args):
     """Syntax: [storm kill_workers]
 
@@ -610,7 +621,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus,
"ui": ui
             "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help":
print_usage,
             "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor":
monitor,
             "upload-credentials": upload_credentials, "get-errors": get_errors, "set_log_level":
set_log_level,
-            "kill_workers": kill_workers }
+            "kill_workers": kill_workers, "node-health-check": healthcheck}
 
 def parse_config(config_list):
     global CONFIG_OPTS

http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 160c29f..c9213bd 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -51,6 +51,8 @@ storm.auth.simple-acl.users.commands: []
 storm.auth.simple-acl.admins: []
 storm.meta.serialization.delegate: "backtype.storm.serialization.GzipThriftSerializationDelegate"
 storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
+storm.health.check.dir: "healthchecks"
+storm.health.check.timeout.ms: 5000
 
 ### nimbus.* configs are for the master
 nimbus.seeds : ["localhost"]

http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/docs/documentation/Setting-up-a-Storm-cluster.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Setting-up-a-Storm-cluster.md b/docs/documentation/Setting-up-a-Storm-cluster.md
index 9fabb27..d38cc6c 100644
--- a/docs/documentation/Setting-up-a-Storm-cluster.md
+++ b/docs/documentation/Setting-up-a-Storm-cluster.md
@@ -80,6 +80,22 @@ supervisor.slots.ports:
     - 6703
 ```
 
+### Monitoring Health of Supervisors
+
+Storm provides a mechanism by which administrators can configure the supervisor to run administrator
supplied scripts periodically to determine if a node is healthy or not. Administrators can
have the supervisor determine if the node is in a healthy state by performing any checks of
their choice in scripts located in storm.health.check.dir. If a script detects the node to
be in an unhealthy state, it must print a line to standard output beginning with the string
ERROR. The supervisor will periodically run the scripts in the health check dir and check
the output. If the script’s output contains the string ERROR, as described above, the supervisor
will shut down any workers and exit. 
+
+The health check directory location can be configured with:
+
+```yaml
+storm.health.check.dir: "healthchecks"
+```
+
+The time to allow any given healthcheck script to run before it is marked failed due to timeout
can be configured with:
+
+```yaml
+storm.health.check.timeout.ms: 5000
+```
+
 ### Configure external libraries and environmental variables (optional)
 
 If you need support from external libraries or custom plugins, you can place such jars into
the extlib/ and extlib-daemon/ directories. Note that the extlib-daemon/ directory stores
jars used only by daemons (Nimbus, Supervisor, DRPC, UI, Logviewer), e.g., HDFS and customized
scheduling libraries. Accordingly, two environmental variables STORM_EXT_CLASSPATH and STORM_EXT_CLASSPATH_DAEMON
can be configured by users for including the external classpath and daemon-only external classpath.

http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/storm-core/src/clj/backtype/storm/command/healthcheck.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/command/healthcheck.clj b/storm-core/src/clj/backtype/storm/command/healthcheck.clj
new file mode 100644
index 0000000..070e9ff
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/command/healthcheck.clj
@@ -0,0 +1,88 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns backtype.storm.command.healthcheck
+  (:require [backtype.storm
+             [config :refer :all]
+             [log :refer :all]]
+            [clojure.java [io :as io]]
+            [clojure [string :refer [split]]])
+  (:gen-class))
+
+(defn interrupter
+  "Interrupt a given thread after ms milliseconds."
+  [thread ms]
+  (let [interrupter (Thread.
+                     (fn []
+                       (try
+                         (Thread/sleep ms)
+                         (.interrupt thread)
+                         (catch InterruptedException e))))]
+    (.start interrupter)
+    interrupter))
+
+(defn check-output [lines]
+  (if (some #(.startsWith % "ERROR") lines)
+    :failed
+    :success))
+
+(defn process-script [conf script]
+  (let [script-proc (. (Runtime/getRuntime) (exec script))
+        curthread (Thread/currentThread)
+        interrupter-thread (interrupter curthread
+                                        (conf STORM-HEALTH-CHECK-TIMEOUT-MS))]
+    (try
+      (.waitFor script-proc)
+      (.interrupt interrupter-thread)
+      (if (not (= (.exitValue script-proc) 0))
+        :failed_with_exit_code
+        (check-output (split
+                       (slurp (.getInputStream script-proc))
+                       #"\n+")))
+      (catch InterruptedException e
+        (println "Script" script "timed out.")
+        :timeout)
+      (catch Exception e
+        (println "Script failed with exception: " e)
+        :failed_with_exception)
+      (finally (.interrupt interrupter-thread)))))
+
+(defn health-check [conf]
+  (let [health-dir (conf STORM-HEALTH-CHECK-DIR)
+        health-files (file-seq (io/file health-dir))
+        health-scripts (filter #(and (.canExecute %)
+                                     (not (.isDirectory %)))
+                               health-files)
+        results (->> health-scripts
+                     (map #(.getAbsolutePath %))
+                     (map (partial process-script conf)))]
+    (log-message
+     (pr-str (map #'vector
+                  (map #(.getAbsolutePath %) health-scripts)
+                  results)))
+    ; failed_with_exit_code is OK. We're mimicing Hadoop's health checks.
+    ; We treat non-zero exit codes as indicators that the scripts failed
+    ; to execute properly, not that the system is unhealthy, in which case
+    ; we don't want to start killing things.
+    (if (every? #(or (= % :failed_with_exit_code)
+                     (= % :success))
+                results)
+      0
+      1)))
+
+(defn -main [& args]
+  (let [conf (read-storm-config)]
+    (System/exit
+     (health-check conf))))

http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 52b2057..64cbfd6 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -28,6 +28,7 @@
   (:import [backtype.storm Config])
   (:import [backtype.storm.generated WorkerResources])
   (:use [backtype.storm.daemon common])
+  (:require [backtype.storm.command [healthcheck :as healthcheck]])
   (:require [backtype.storm.daemon [worker :as worker]]
             [backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as
event]]
             [clojure.set :as set])
@@ -545,7 +546,17 @@
       (schedule-recurring (:event-timer supervisor)
                           0
                           (conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
-                          (fn [] (.add processes-event-manager sync-processes))))
+                          (fn [] (.add processes-event-manager sync-processes)))
+      (schedule-recurring (:event-timer supervisor)
+                          (* 60 5)
+                          (* 60 5)
+                          (fn [] (let [health-code (healthcheck/health-check conf)
+                                       ids (my-worker-ids conf)]
+                                   (if (not (= health-code 0))
+                                     (do
+                                       (doseq [id ids]
+                                         (shutdown-worker supervisor id))
+                                       (throw (RuntimeException. "Supervisor failed health
check. Exiting."))))))))
     (log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname
supervisor))
     (reify
      Shutdownable

http://git-wip-us.apache.org/repos/asf/storm/blob/5bd5cf95/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 73e41cb..609a9f7 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -329,6 +329,19 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_ID = "storm.id";
 
     /**
+     * The directory where storm's health scripts go.
+     */
+    public static final String STORM_HEALTH_CHECK_DIR = "storm.health.check.dir";
+    public static final Object STORM_HEALTH_CHECK_DIR_SCHEMA = String.class;
+
+    /**
+     * The time to allow any given healthcheck script to run before it
+     * is marked failed due to timeout
+     */
+    public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = "storm.health.check.timeout.ms";
+    public static final Object STORM_HEALTH_CHECK_TIMEOUT_MS_SCHEMA = Number.class;
+
+    /**
      * The number of times to retry a Nimbus operation.
      */
     @isNumber


Mime
View raw message