storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [04/24] git commit: Do the worker HB timeout check when HB's are updated
Date Tue, 19 Nov 2013 15:26:41 GMT
Do the worker HB timeout check when HB's are updated


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/edbb17cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/edbb17cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/edbb17cb

Branch: refs/heads/master
Commit: edbb17cb76d3e3790a4c9ca52332fa5638c37bd9
Parents: 1e3d266
Author: Derek Dagit <derekd@yahoo-inc.com>
Authored: Thu Oct 10 16:07:37 2013 -0500
Committer: Derek Dagit <derekd@yahoo-inc.com>
Committed: Thu Oct 10 16:07:37 2013 -0500

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/edbb17cb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 04731dc..caac996 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -326,7 +326,7 @@
 ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
 ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
 ;; tracked through heartbeat-cache
-(defn- update-executor-cache [curr hb]
+(defn- update-executor-cache [curr hb timeout]
   (let [reported-time (:time-secs hb)
         {last-nimbus-time :nimbus-time
          last-reported-time :executor-reported-time} curr
@@ -338,15 +338,18 @@
                       (current-time-secs)
                       last-nimbus-time
                       )]
-      {:nimbus-time nimbus-time
+      {:is-timed-out (and
+                       nimbus-time
+                       (>= (time-delta nimbus-time) timeout))
+       :nimbus-time nimbus-time
        :executor-reported-time reported-time}))
 
-(defn update-heartbeat-cache [cache executor-beats all-executors]
+(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
   (let [cache (select-keys cache all-executors)]
     (into {}
       (for [executor all-executors :let [curr (cache executor)]]
         [executor
-         (update-executor-cache curr (get executor-beats executor))]
+         (update-executor-cache curr (get executor-beats executor) timeout)]
          ))))
 
 (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
@@ -355,7 +358,8 @@
         executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port
existing-assignment))
         cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
                                       executor-beats
-                                      all-executors)]
+                                      all-executors
+                                      ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))]
       (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
 
 (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
@@ -380,14 +384,12 @@
     (->> all-executors
         (filter (fn [executor]
           (let [start-time (get executor-start-times executor)
-                nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
+                is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
             (if (and start-time
                    (or
                     (< (time-delta start-time)
                        (conf NIMBUS-TASK-LAUNCH-SECS))
-                    (not nimbus-time)
-                    (< (time-delta nimbus-time)
-                       (conf NIMBUS-TASK-TIMEOUT-SECS))
+                    (not is-timed-out)
                     ))
               true
               (do


Mime
View raw message