storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [13/24] git commit: change log format
Date Tue, 01 Jul 2014 20:40:44 GMT
change log format


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4d6a27f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4d6a27f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4d6a27f4

Branch: refs/heads/master
Commit: 4d6a27f466b942cd011efb0ffd538f0c04daf047
Parents: 0427e06
Author: JuDasheng <judasheng@meituan.com>
Authored: Thu Jun 12 17:46:08 2014 +0800
Committer: JuDasheng <judasheng@meituan.com>
Committed: Thu Jun 12 17:46:08 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  | 67 ++++++++++++++++----
 .../src/clj/backtype/storm/daemon/task.clj      |  4 +-
 2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 1bbe53d..64e60be 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,6 +18,7 @@
   (:use [backtype.storm bootstrap])
   (:import [backtype.storm.hooks ITaskHook])
   (:import [backtype.storm.tuple Tuple])
+  (:import [backtype.storm.tuple MessageId])
   (:import [backtype.storm.spout ISpoutWaitStrategy])
   (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
             EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -389,12 +390,25 @@
   (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
         task-ids (:task-ids executor-data)
         debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+        component-id (:component-id executor-data)
+        executor-id (:executor-id executor-data)
+        executor-type (:type executor-data)
         ]
     (disruptor/clojure-handler
       (fn [tuple-batch sequence-id end-of-batch?]
         (fast-list-iter [[task-id msg] tuple-batch]
-          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer
msg))]
-            (when debug? (log-message "Processing received message " tuple))
+          (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer
msg))
+                tuple-streamid (.getSourceStreamId tuple)
+                tuple-source (.getSourceComponent tuple)
+                tuple-id (.getMessageId tuple)
+                tuple-values (.getValues tuple)
+                ]
+            (when debug? 
+              (if (= tuple-streamid "default")
+                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid
"] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
+                (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid
"] Source[" tuple-source "] TupleId[" tuple-values "]")
+                )
+              )
             (if task-id
               (tuple-action-fn task-id tuple)
               ;; null task ids are broadcast tuples
@@ -421,6 +435,7 @@
         last-active (atom false)        
         spouts (ArrayList. (map :object (vals task-datas)))
         rand (Random. (Utils/secureRandomLong))
+        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
         
         pending (RotatingMap.
                  2 ;; microoptimize for performance of .size method
@@ -428,9 +443,12 @@
                    (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
                      (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                        (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info
time-delta)
+                       (when debug? 
+                         (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED]
TupleId[" msg-id "] values[" tuple-info "]"))
                        ))))
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)]
+                          (let [stream-id (.getSourceStreamId tuple)
+                                tuple-id (.getMessageId tuple)]
                             (condp = stream-id
                               Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
                               Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data
(get task-datas task-id) tuple)
@@ -441,10 +459,18 @@
                                     (throw-runtime "Fatal error, mismatched task ids: " task-id
" " stored-task-id))
                                   (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                     (condp = stream-id
-                                      ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get
task-datas task-id)
-                                                                         spout-id tuple-finished-info
time-delta)
-                                      ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data
(get task-datas task-id)
-                                                                           spout-id tuple-finished-info
time-delta)
+                                      ACKER-ACK-STREAM-ID (do 
+                                                            (ack-spout-msg executor-data
(get task-datas task-id)
+                                                                            spout-id tuple-finished-info
time-delta)
+                                                            (when debug?
+                                                              (log-message "Component[" component-id
"] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+                                                            )
+                                      ACKER-FAIL-STREAM-ID (do 
+                                                             (fail-spout-msg executor-data
(get task-datas task-id)
+                                                                              spout-id tuple-finished-info
time-delta)
+                                                             (when debug?
+                                                              (log-message "Component[" component-id
"] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
+                                                             )
                                       )))
                                 ;; TODO: on failure, emit tuple to failure stream
                                 ))))
@@ -493,6 +519,8 @@
                                                            (transfer-fn out-task
                                                                         out-tuple
                                                                         overflow-buffer)
+                                                           (when debug? 
+                                                             (log-message "Component[" component-id
"] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
                                                            ))
                                          (if rooted?
                                            (do
@@ -598,6 +626,8 @@
         {:keys [storm-conf component-id worker-context transfer-fn report-error sampler
                 open-or-prepare-was-called?]} executor-data
         rand (Random. (Utils/secureRandomLong))
+        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
                           ;; synchronization needs to be done with a key provided by this
bolt, otherwise:
                           ;; spout 1 sends synchronization (s1), dies, same spout restarts
somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before
s1 -> lose the incremental update
@@ -660,7 +690,13 @@
                                                     (tasks-fn task stream values)
                                                     (tasks-fn stream values))]
                                     (fast-list-iter [t out-tasks]
-                                                    (let [anchors-to-ids (HashMap.)]
+                                                    (let [anchors-to-ids (HashMap.)
+                                                          out-tuple (TupleImpl. worker-context
+                                                                               values
+                                                                               task-id
+                                                                               stream
+                                                                               (MessageId/makeId
anchors-to-ids))
+                                                          ]
                                                       (fast-list-iter [^TupleImpl a anchors]
                                                                       (let [root-ids (->
a .getMessageId .getAnchorsToIds .keySet)]
                                                                         (when (pos? (count
root-ids))
@@ -669,12 +705,15 @@
                                                                             (fast-list-iter
[root-id root-ids]
                                                                                         
   (put-xor! anchors-to-ids root-id edge-id))
                                                                             ))))
-                                                      (transfer-fn t
-                                                                   (TupleImpl. worker-context
-                                                                               values
-                                                                               task-id
-                                                                               stream
-                                                                               (MessageId/makeId
anchors-to-ids)))))
+                                                      (transfer-fn t out-tuple)
+                                                      (when debug? 
+                                                        (if (= component-id "__acker")
+                                                          (log-message "Component[" component-id
"] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
+                                                          (log-message "Component[" component-id
"] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values
"]")
+                                                          )
+                                                        
+                                                        )
+                                                      ))
                                     (or out-tasks [])))]]
           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
           (if (= component-id Constants/SYSTEM_COMPONENT_ID)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4d6a27f4/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 3650150..29756a1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
         
     (fn ([^Integer out-task-id ^String stream ^List values]
           (when debug?
-            (log-message "Emitting direct: " out-task-id "; " component-id " " stream " "
values))
+            (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId["
values "]"))
           (let [target-component (.getComponentId worker-context out-task-id)
                 component->grouping (get stream->component->grouper stream)
                 grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
             ))
         ([^String stream ^List values]
            (when debug?
-             (log-message "Emitting: " component-id " " stream " " values))
+             (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId["
values "]"))
            (let [out-tasks (ArrayList.)]
              (fast-map-iter [[out-component grouper] (get stream->component->grouper
stream)]
                (when (= :direct grouper)


Mime
View raw message