storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/2] storm git commit: STORM-2853 Initialize tick tuple after initializing spouts/bolts
Date Fri, 02 Feb 2018 04:19:09 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch d4247d759 -> ca25384c5


STORM-2853 Initialize tick tuple after initializing spouts/bolts

* this prevents newly-initializing executor in deactivated topology to show high CPU usage
* this is based on the fact that all the tasks in executor are for same component


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91f15228
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91f15228
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91f15228

Branch: refs/heads/1.x-branch
Commit: 91f1522820f3ac1c53594e943dfa8f3a2cde1a8f
Parents: 34a220c
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Feb 1 10:08:48 2018 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Thu Feb 1 10:08:48 2018 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/executor.clj | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/91f15228/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index b9bcaae..3940f1b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -350,7 +350,7 @@
      (when (seq data-points)
        (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
 
-(defn setup-ticks! [worker executor-data]
+(defn setup-ticks! [executor-data]
   (let [storm-conf (:storm-conf executor-data)
         comp-id (:component-id executor-data)
         tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
@@ -362,7 +362,7 @@
                    (= :spout (:type executor-data))))
         (log-message "Timeouts disabled for executor " comp-id ":" (:executor-id executor-data))
         (schedule-recurring
-          (:user-timer worker)
+          (:user-timer (:worker executor-data))
           tick-time-secs
           tick-time-secs
           (fn []
@@ -390,14 +390,13 @@
               (.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
               (.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
 
-        ;; starting the batch-transfer->worker ensures that anything publishing to that
queue 
+        ;; starting the batch-transfer->worker ensures that anything publishing to that
queue
         ;; doesn't block (because it's a single threaded queue and the caching/consumer started
         ;; trick isn't thread-safe)
         system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
         handlers (with-error-reaction report-error-and-die
                    (mk-threads executor-data task-datas initial-credentials))
-        threads (concat handlers system-threads)]    
-    (setup-ticks! worker executor-data)
+        threads (concat handlers system-threads)]
 
     (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
     ;; TODO: add method here to get rendered stats... have worker call that when heartbeating
@@ -627,6 +626,7 @@
                       )))))
         (reset! open-or-prepare-was-called? true) 
         (log-message "Opened spout " component-id ":" (keys task-datas))
+        (setup-ticks! executor-data)
         (setup-metrics! executor-data)
         
         (fn []
@@ -852,6 +852,7 @@
                          )))))
         (reset! open-or-prepare-was-called? true)        
         (log-message "Prepared bolt " component-id ":" (keys task-datas))
+        (setup-ticks! executor-data)
         (setup-metrics! executor-data)
 
         (let [receive-queue (:receive-queue executor-data)


Mime
View raw message