storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-1742 More accurate 'complete latency'
Date Fri, 08 Jul 2016 07:57:32 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 787dda348 -> 89d590a64


STORM-1742 More accurate 'complete latency'

* Acker computes 'complete latency' and sends back to Spout
  * start time: Acker receiving ACK_INIT from Spout
  * end time: Acker receiving ACK message which make the tuple tree completed
* When Acker provides complete latency, Spout just uses this value to record that latency
  * exception case: tuple timed out - Spout doesn't receive failed information from Acker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d5ee31ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d5ee31ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d5ee31ca

Branch: refs/heads/1.x-branch
Commit: d5ee31ca65e50e24ce3a2ba71132a64997753efe
Parents: f9d561a
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Thu Apr 28 11:25:51 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jun 17 18:47:06 2016 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/acker.clj   | 13 +++++++------
 storm-core/src/clj/org/apache/storm/daemon/common.clj  |  6 +++---
 .../src/clj/org/apache/storm/daemon/executor.clj       |  3 ++-
 3 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d5ee31ca/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7c29f46..012a8e3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.acker
   (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
-  (:import [org.apache.storm.tuple Tuple Fields])
+  (:import [org.apache.storm.tuple Tuple])
   (:import [org.apache.storm.utils RotatingMap MutableObject])
   (:import [java.util List Map])
   (:import [org.apache.storm Constants])
@@ -60,7 +60,8 @@
                        curr (condp = stream-id
                                 ACKER-INIT-STREAM-ID (-> curr
                                                          (update-ack (.getValue tuple 1))
-                                                         (assoc :spout-task (.getValue tuple
2)))
+                                                         (assoc :spout-task (.getValue tuple
2))
+                                                         (assoc :start-time (System/currentTimeMillis)))
                                 ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
                                 ACKER-FAIL-STREAM-ID (assoc curr :failed true)
                                 ACKER-RESET-TIMEOUT-STREAM-ID curr)]
@@ -72,7 +73,7 @@
                              (acker-emit-direct output-collector
                                                 (:spout-task curr)
                                                 ACKER-ACK-STREAM-ID
-                                                [id]
+                                                [id (time-delta-ms (:start-time curr))]
                                                 ))
                            (:failed curr)
                            (do
@@ -80,13 +81,13 @@
                              (acker-emit-direct output-collector
                                                 (:spout-task curr)
                                                 ACKER-FAIL-STREAM-ID
-                                                [id]
+                                                [id (time-delta-ms (:start-time curr))]
                                                 ))
                            (= stream-id ACKER-RESET-TIMEOUT-STREAM-ID)
                            (acker-emit-direct output-collector
                                               (:spout-task curr)
                                               ACKER-RESET-TIMEOUT-STREAM-ID
-                                              [id]
+                                              [id (time-delta-ms (:start-time curr))]
                                               )
                            ))
                    (.ack output-collector tuple)
@@ -112,4 +113,4 @@
 (defn -cleanup [this]
   (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
     (.cleanup delegate)
-    ))
+    ))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d5ee31ca/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 68f58b0..017abbd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -222,9 +222,9 @@
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS)
(storm-conf TOPOLOGY-ACKER-EXECUTORS))
         acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
                                          (new org.apache.storm.daemon.acker)
-                                         {ACKER-ACK-STREAM-ID (thrift/direct-output-fields
["id"])
-                                          ACKER-FAIL-STREAM-ID (thrift/direct-output-fields
["id"])
-                                          ACKER-RESET-TIMEOUT-STREAM-ID (thrift/direct-output-fields
["id"])
+                                         {ACKER-ACK-STREAM-ID (thrift/direct-output-fields
["id" "time-delta-ms"])
+                                          ACKER-FAIL-STREAM-ID (thrift/direct-output-fields
["id" "time-delta-ms"])
+                                          ACKER-RESET-TIMEOUT-STREAM-ID (thrift/direct-output-fields
["id" "time-delta-ms"])
                                           }
                                          :p num-executors
                                          :conf {TOPOLOGY-TASKS num-executors

http://git-wip-us.apache.org/repos/asf/storm/blob/d5ee31ca/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index d50e494..90f5f86 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -528,11 +528,12 @@
                                    (when pending-for-id
                                      (.put pending id pending-for-id))) 
                               (let [id (.getValue tuple 0)
+                                    time-delta-ms (.getValue tuple 1)
                                     [stored-task-id spout-id tuple-finished-info start-time-ms]
(.remove pending id)]
                                 (when spout-id
                                   (when-not (= stored-task-id task-id)
                                     (throw-runtime "Fatal error, mismatched task ids: " task-id
" " stored-task-id))
-                                  (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
+                                  (let [time-delta (if start-time-ms time-delta-ms)]
                                     (condp = stream-id
                                       ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get
task-datas task-id)
                                                                          spout-id tuple-finished-info
time-delta id debug?)


Mime
View raw message