storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-794 Modify Spout async loop to treat deactivate ASAP
Date Thu, 12 Nov 2015 15:49:54 GMT
Repository: storm
Updated Branches:
  refs/heads/master 4eccb151b -> 205794363


STORM-794 Modify Spout async loop to treat deactivate ASAP


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e40c9a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e40c9a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e40c9a6

Branch: refs/heads/master
Commit: 3e40c9a695c63cadd526c6e4fb365bda6ff66660
Parents: 34cda0d
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Nov 12 08:48:39 2015 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Nov 12 08:48:39 2015 +0900

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 43 +++++++++++---------
 1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3e40c9a6/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 461f8d6..0390987 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -622,25 +622,28 @@
                 reached-max-spout-pending (and max-spout-pending
                                                (>= (.size pending) max-spout-pending))
                 ]
-            (if (and (not (.isFull transfer-queue))
-                     (not throttle-on)
-                     (not reached-max-spout-pending))
-              (if active?
-                (do
-                  (when-not @last-active
-                    (reset! last-active true)
-                    (log-message "Activating spout " component-id ":" (keys task-datas))
-                    (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-               
-                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
-                (do
-                  (when @last-active
-                    (reset! last-active false)
-                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
-                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                  ;; TODO: log that it's getting throttled
-                  (Time/sleep 100)
-                  (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data)
(:stats executor-data)))))
+            (if active?
+              ; activated
+              (do
+                (when-not @last-active
+                  (reset! last-active true)
+                  (log-message "Activating spout " component-id ":" (keys task-datas))
+                  (fast-list-iter [^ISpout spout spouts] (.activate spout)))
+
+                (if (and (not (.isFull transfer-queue))
+                      (not throttle-on)
+                      (not reached-max-spout-pending))
+                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
+              ; deactivated
+              (do
+                (when @last-active
+                  (reset! last-active false)
+                  (log-message "Deactivating spout " component-id ":" (keys task-datas))
+                  (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
+                ;; TODO: log that it's getting throttled
+                (Time/sleep 100)
+                (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data)
(:stats executor-data))))
+
             (if (and (= curr-count (.get emitted-count)) active?)
               (do (.increment empty-emit-streak)
                   (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
@@ -650,7 +653,7 @@
                     (if reached-max-spout-pending
                       (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data)
(:stats executor-data)))))
               (.set empty-emit-streak 0)
-              ))           
+              ))
           0))
       :kill-fn (:report-error-and-die executor-data)
       :factory? true


Mime
View raw message