storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: Adding backpressure timeout, backpressure znodes cleanup, Do not delete backpressure ephemeral node frequently
Date Thu, 01 Feb 2018 14:36:04 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 49c2fc39f -> 33f543cf6


Adding backpressure timeout, backpressure znodes cleanup, Do not delete backpressure ephemeral
node frequently


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd04a556
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd04a556
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd04a556

Branch: refs/heads/1.x-branch
Commit: dd04a5563317fa6f57d3d7ec32190940b98454d7
Parents: 58ae04b
Author: Kishor Patil <kpatil@yahoo-inc.com>
Authored: Mon Jan 22 15:47:42 2018 -0500
Committer: Kishor Patil <kpatil@yahoo-inc.com>
Committed: Mon Jan 22 15:47:52 2018 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +
 storm-core/src/clj/org/apache/storm/cluster.clj | 63 ++++++++++++++----
 .../src/clj/org/apache/storm/daemon/worker.clj  | 68 ++++++++++++--------
 storm-core/src/jvm/org/apache/storm/Config.java | 17 +++++
 .../test/clj/org/apache/storm/cluster_test.clj  | 15 +++++
 5 files changed, 123 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f89211b..2bd7855 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -194,6 +194,8 @@ task.backpressure.poll.secs: 30
 topology.backpressure.enable: false
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
+backpressure.znode.timeout.secs: 30
+backpressure.znode.update.freq.secs: 15
 
 zmq.threads: 1
 zmq.linger.millis: 5000

http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 810b3c3..eafa40b 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -18,7 +18,8 @@
   (:import [org.apache.zookeeper.data Stat ACL Id]
            [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat
ErrorInfo Credentials NimbusSummary
             LogConfig ProfileAction ProfileRequest NodeInfo]
-           [java.io Serializable])
+           [java.io Serializable]
+           [java.nio ByteBuffer])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs
ZooDefs$Ids ZooDefs$Perms])
   (:import [org.apache.curator.framework CuratorFramework])
   (:import [org.apache.storm.utils Utils])
@@ -80,7 +81,7 @@
   (remove-worker-heartbeat! [this storm-id node port])
   (supervisor-heartbeat! [this supervisor-id info])
   (worker-backpressure! [this storm-id node port info])
-  (topology-backpressure [this storm-id callback])
+  (topology-backpressure [this storm-id timeout-ms callback])
   (setup-backpressure! [this storm-id])
   (remove-backpressure! [this storm-id])
   (remove-worker-backpressure! [this storm-id node port])
@@ -172,6 +173,10 @@
   [storm-id node port]
   (str (backpressure-storm-root storm-id) "/" node "-" port))
 
+(defn backpressure-full-path
+  [storm-id short-path]
+  (str (backpressure-storm-root storm-id) "/" short-path))
+
 (defn error-storm-root
   [storm-id]
   (str ERRORS-SUBTREE "/" storm-id))
@@ -242,6 +247,20 @@
                       :stats (get executor-stats t)}})))
          (into {}))))
 
+
+(defn max-timestamp
+  "Reduces the timestamps (e.g. those set by worker-backpressure!)
+  to the most recent timestamp"
+  [cluster-state storm-id paths]
+  (reduce (fn [acc path]
+            (let [data (.get_data cluster-state (backpressure-full-path storm-id path) false)
+                  timestamp (if data
+                              (.. (ByteBuffer/wrap data) (getLong))
+                              0)]
+              (Math/max acc timestamp)))
+          0
+          paths))
+
 ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed
to be called.
 (defnk mk-storm-cluster-state
   [cluster-state-spec :acls nil :context (ClusterStateContext.)]
@@ -483,27 +502,37 @@
             (log-warn-error e "Could not teardown heartbeats for " storm-id))))
 
       (worker-backpressure!
-        [this storm-id node port on?]
-        "if znode exists and to be not on?, delete; if exists and on?, do nothing;
-        if not exists and to be on?, create; if not exists and not on?, do nothing"
+        [this storm-id node port timestamp]
+        "If znode exists and timestamp is non-positive, ignore;
+         if exists and timestamp is larger than 0, update the timestamp;
+         if not exists and timestamp is larger than 0, create the znode and set the timestamp;
+         if not exists and timestamp is non-positive, do nothing."
         (let [path (backpressure-path storm-id node port)
               existed (.node_exists cluster-state path false)]
           (if existed
-            (if (not on?)
-              (.delete_node cluster-state path))   ;; delete the znode since the worker is
not congested
-            (if on?
-              (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since
worker is congested
+            (if-not (<= timestamp 0)
+              (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+                (.set_data cluster-state path bytes acls)))
+            (when timestamp
+              (let [bytes (.. (ByteBuffer/allocate (Long/BYTES)) (putLong timestamp) (array))]
+                (.set_ephemeral_node cluster-state path bytes acls)))))) ;; create the znode
since worker is congested
     
       (topology-backpressure
-        [this storm-id callback]
+        [this storm-id timeout-ms callback]
         "if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise
throttle-off.
+         But if the backpresure/storm-id dir is not empty and has not been updated for more
than timeoutMs, we treat it as throttle-off.
+         This will prevent the spouts from getting stuck indefinitely if something wrong
happens.
          The backpressure/storm-id dir may not exist if nimbus has shutdown the topology"
         (when callback
           (swap! backpressure-callback assoc storm-id callback))
         (let [path (backpressure-storm-root storm-id)
               children (if (.node_exists cluster-state path false)
-                         (.get_children cluster-state path (not-nil? callback))) ]
-              (> (count children) 0)))
+                         (.get_children cluster-state path (not-nil? callback)))
+              most-recent-backpressure (max-timestamp cluster-state storm-id children)
+              current-time (System/currentTimeMillis)
+              ret (> timeout-ms (- current-time most-recent-backpressure))]
+          (log-debug "topology backpressure is " (if ret "on" "off"))
+          ret))
       
       (setup-backpressure!
         [this storm-id]
@@ -511,14 +540,20 @@
 
       (remove-backpressure!
         [this storm-id]
-        (.delete_node cluster-state (backpressure-storm-root storm-id)))
+        (try-cause
+          (.delete_node cluster-state (backpressure-storm-root storm-id))
+          (catch KeeperException e
+            (log-warn-error e "Could not teardown backpressure for " storm-id))))
 
       (remove-worker-backpressure!
         [this storm-id node port]
         (let [path (backpressure-path storm-id node port)
               existed (.node_exists cluster-state path false)]
           (if existed
-            (.delete_node cluster-state (backpressure-path storm-id node port)))))
+            (try-cause
+              (.delete_node cluster-state (backpressure-path storm-id node port))
+              (catch KeeperException e
+                (log-warn-error e "Could not teardown backpressure for " storm-id))))))
     
       (teardown-topology-errors!
         [this storm-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 6626272..633a61d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -131,30 +131,39 @@
     (let [tuple (.getTuple addressed-tuple)]
       (.serialize serializer tuple))))
 
-(defn- mk-backpressure-handler [executors]
-  "make a handler that checks and updates worker's backpressure flag"
-  (disruptor/worker-backpressure-handler
-    (fn [worker]
-      (let [storm-id (:storm-id worker)
-            assignment-id (:assignment-id worker)
-            port (:port worker)
-            storm-cluster-state (:storm-cluster-state worker)
-            prev-backpressure-flag @(:backpressure worker)
-            ;; the backpressure flag is true if at least one of the disruptor queues has
throttle-on
-            curr-backpressure-flag (if executors
-                                     (or (.getThrottleOn (:transfer-queue worker))
-                                       (reduce #(or %1 %2) (map #(.get-backpressure-flag
%1) executors)))
-                                     prev-backpressure-flag)]
-        ;; update the worker's backpressure flag to zookeeper only when it has changed
-        (when (not= prev-backpressure-flag curr-backpressure-flag)
-          (try
-            (log-debug "worker backpressure flag changing from " prev-backpressure-flag "
to " curr-backpressure-flag)
-            (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag)
-            ;; doing the local reset after the zk update succeeds is very important to avoid
a bad state upon zk exception
-            (reset! (:backpressure worker) curr-backpressure-flag)
-            (catch Exception exc
-              (log-error exc "workerBackpressure update failed when connecting to ZK ...
will retry"))))
-        ))))
+(defn should-trigger-backpressure [executors worker]
+  (or (.getThrottleOn (:transfer-queue worker))
+      (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))))
+
+(defn- mk-backpressure-handler [executors topo-conf]
+  "make a handler that checks and updates worker's backpressure timestamp"
+  (let [update-freq-ms (* (topo-conf BACKPRESSURE-ZNODE-UPDATE-FREQ-SECS) 1000)]
+    (disruptor/worker-backpressure-handler
+      (if executors
+        (fn [worker]
+          (let [storm-id (:storm-id worker)
+                assignment-id (:assignment-id worker)
+                port (:port worker)
+                storm-cluster-state (:storm-cluster-state worker)
+                prev-backpressure-timestamp @(:backpressure worker)
+                curr-timestamp (System/currentTimeMillis)
+                ;; the backpressure flag is true if at least one of the disruptor queues
has throttle-on
+                curr-backpressure-timestamp (if (should-trigger-backpressure executors worker)
+                                              ;; Update the backpressure timestamp every
update-freq-ms seconds
+                                              (if (> (- curr-timestamp (or prev-backpressure-timestamp
0)) update-freq-ms)
+                                                curr-timestamp
+                                                prev-backpressure-timestamp)
+                                              0)]
+            ;; update the worker's backpressure timestamp to zookeeper only when it has changed
+            (when (not= prev-backpressure-timestamp curr-backpressure-timestamp)
+              (try
+                (log-debug "worker backpressure timestamp changing from " prev-backpressure-timestamp
" to " curr-backpressure-timestamp)
+                (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-timestamp)
+                ;; doing the local reset after the zk update succeeds is very important to
avoid a bad state upon zk exception
+                (reset! (:backpressure worker) curr-backpressure-timestamp)
+                (catch Exception exc
+                  (log-error exc "workerBackpressure update failed when connecting to ZK
... will retry"))))))
+        (fn [workers])))))
 
 (defn- mk-disruptor-backpressure-handler [worker]
   "make a handler for the worker's send disruptor queue to
@@ -317,7 +326,7 @@
       :transfer-fn (mk-transfer-fn <>)
       :load-mapping (LoadMapping.)
       :assignment-versions assignment-versions
-      :backpressure (atom false) ;; whether this worker is going slow
+      :backpressure (atom 0) ;; whether this worker is going slow. non-positive means turning
off backpressure
       :backpressure-trigger (Object.) ;; a trigger for synchronization with executors
       :throttle-on (atom false) ;; whether throttle is activated for spouts
       )))
@@ -647,15 +656,18 @@
         _ (-> (.setHighWaterMark (:transfer-queue worker) ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
               (.setLowWaterMark ((:storm-conf worker) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
               (.setEnableBackpressure ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)))
-        backpressure-handler (mk-backpressure-handler @executors)        
+        backpressure-handler (mk-backpressure-handler @executors storm-conf)
         backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker
backpressure-handler)
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
             (.start backpressure-thread))
+        ;; this callback is registered as a zk watch on topology's backpressure directory
+        ;; which makes sure that the topology's backpressure status is updated to the worker's
throttle-on
+        backpressure-znode-timeout-ms (* (storm-conf BACKPRESSURE-ZNODE-TIMEOUT-SECS) 1000)
         topology-backpressure-callback (fn cb [& ignored]
-                   (let [throttle-on (.topology-backpressure storm-cluster-state storm-id
cb)]
+                   (let [throttle-on (.topology-backpressure storm-cluster-state storm-id
backpressure-znode-timeout-ms cb)]
                      (reset! (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
-            (.topology-backpressure storm-cluster-state storm-id topology-backpressure-callback))
+            (.topology-backpressure storm-cluster-state storm-id backpressure-znode-timeout-ms
topology-backpressure-callback))
 
         shutdown* (fn []
                     (log-message "Shutting down worker " storm-id " " assignment-id " " port)

http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6b0c868..11f980e 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1561,6 +1561,23 @@ public class Config extends HashMap<String, Object> {
     public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
 
     /**
+     * How long until the backpressure znode is invalid.
+     * It's measured by the data (timestamp) of the znode, not the ctime (creation time)
or mtime (modification time), etc.
+     * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs";
+
+    /**
+     * How often will the data (timestamp) of backpressure znode be updated.
+     * But if the worker backpressure status (on/off) changes, the znode will be updated
anyway.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs";
+
+    /**
      * A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for
exact config format).
      * Each listed class will be routed cluster related metrics data.
      * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed
in Nimbus.

http://git-wip-us.apache.org/repos/asf/storm/blob/dd04a556/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 55b686e..cb5f064 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -319,3 +319,18 @@
       (mk-storm-cluster-state {})
       (verify-call-times-for mk-distributed-cluster-state 1)
       (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))
+
+(deftest test-cluster-state-backpressure
+         (testing "Test that we can get topology backpressure."
+                  (stubbing [zk/mkdirs nil
+                             zk/mk-client (reify CuratorFramework (^void close [this] nil))
+                             mk-distributed-cluster-state (reify ClusterState
+                                                                 (get_data [this path watch?]
(byte-array 10))
+                                                                 (register [this callback]
nil)
+                                                                 (mkdirs [this path acls]
nil)
+                                                                 (node_exists [this path
watch?]
+                                                                              (log-message
"Running node_exists.") true)
+                                                                 (get_children [this path
watch?] '("/foo/bar")))]
+                            (let [cluster-state (mk-storm-cluster-state {})]
+                                 (.get_data (mk-distributed-cluster-state) "/foo/bar" false)
+                                 (topology-backpressure cluster-state "" 30 nil)))))


Mime
View raw message