storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/3] storm git commit: STORM-1719 Introduce REST API: Topology metric stats for stream
Date Tue, 05 Jul 2016 00:04:58 GMT
Repository: storm
Updated Branches:
  refs/heads/master 14c3a36d8 -> be677d274


STORM-1719 Introduce REST API: Topology metric stats for stream

* Path: /api/v1/topology/:id/metrics
* This API provides detailed metrics for topology
** shows metrics per component, which are aggregated by stream
* add documentation about new REST API
** please refer docs/STORM-UI-REST-API.md for details


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04372334
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04372334
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04372334

Branch: refs/heads/master
Commit: 0437233415185011cbd9390990f86b8daeca6fa3
Parents: 55809d6
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Mon Jun 27 17:36:09 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Mon Jun 27 17:36:09 2016 +0900

----------------------------------------------------------------------
 docs/STORM-UI-REST-API.md                       | 276 +++++++++++++++++++
 storm-core/src/clj/org/apache/storm/ui/core.clj | 106 ++++++-
 2 files changed, 380 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/04372334/docs/STORM-UI-REST-API.md
----------------------------------------------------------------------
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index bbed956..a2fc39a 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -510,6 +510,282 @@ Sample response:
 }
 ```
 
+### /api/v1/topology/:id/metrics
+
+Returns detailed metrics for topology. It shows metrics per component, which are aggregated
by stream.
+
+|Parameter |Value   |Description  |
+|----------|--------|-------------|
+|id   	   |String (required)| Topology Id  |
+|window    |String. Default value :all-time| window duration for metrics in seconds|
+|sys       |String. Values 1 or 0. Default value 0| Controls including sys stats part of
the response|
+
+Response fields:
+
+|Field  |Value |Description| 
+|---	|---	|--- 
+|window    |String. Default value ":all-time" | window duration for metrics in seconds|
+ |windowHint| String | window param value in "hh mm ss" format. Default value is "All Time"|

+|spouts| Array | Array of all the spout components in the topology|
+|spouts.id| String |Spout id|
+|spouts.emitted| Array | Array of all the output streams this spout emits messages |
+|spouts.emitted.stream_id| String | Stream id for this stream |
+|spouts.emitted.value| Long | Number of messages emitted in given window|
+|spouts.transferred | Array | Array of all the output streams this spout transfers messages
|
+|spouts.transferred.stream_id| String | Stream id for this stream |
+|spouts.transferred.value| Long |Number messages transferred in given window|
+|spouts.acked| Array | Array of all the output streams this spout receives ack of messages
|
+|spouts.acked.stream_id| String | Stream id for this stream |
+|spouts.acked.value| Long |Number of messages acked in given window|
+|spouts.failed| Array | Array of all the output streams this spout receives fail of messages
|
+|spouts.failed.stream_id| String | Stream id for this stream |
+|spouts.failed.value| Long |Number of messages failed in given window|
+|spouts.complete_ms_avg| Array | Array of all the output streams this spout receives ack
of messages |
+|spouts.complete_ms_avg.stream_id| String | Stream id for this stream |
+|spouts.complete_ms_avg.value| String (double value returned in String format) | Total latency
for processing the message|
+|bolts| Array | Array of all the bolt components in the topology|
+|bolts.id| String |Bolt id|
+|bolts.emitted| Array | Array of all the output streams this bolt emits messages |
+|bolts.emitted.stream_id| String | Stream id for this stream |
+|bolts.emitted.value| Long | Number of messages emitted in given window|
+|bolts.transferred | Array | Array of all the output streams this bolt transfers messages
|
+|bolts.transferred.stream_id| String | Stream id for this stream |
+|bolts.transferred.value| Long |Number messages transferred in given window|
+|bolts.acked| Array | Array of all the input streams this bolt acknowledges of messages |
+|bolts.acked.component_id| String | Component id for this stream |
+|bolts.acked.stream_id| String | Stream id for this stream |
+|bolts.acked.value| Long |Number of messages acked in given window|
+|bolts.failed| Array | Array of all the input streams this bolt receives fail of messages
|
+|bolts.failed.component_id| String | Component id for this stream |
+|bolts.failed.stream_id| String | Stream id for this stream |
+|bolts.failed.value| Long |Number of messages failed in given window|
+|bolts.process_ms_avg| Array | Array of all the input streams this spout acks messages |
+|bolts.process_ms_avg.component_id| String | Component id for this stream |
+|bolts.process_ms_avg.stream_id| String | Stream id for this stream |
+|bolts.process_ms_avg.value| String (double value returned in String format) |Average time
of the bolt to ack a message after it was received|
+|bolts.executed| Array | Array of all the input streams this bolt executes messages |
+|bolts.executed.component_id| String | Component id for this stream |
+|bolts.executed.stream_id| String | Stream id for this stream |
+|bolts.executed.value| Long |Number of messages executed in given window|
+|bolts.executed_ms_avg| Array | Array of all the output streams this spout receives ack of
messages |
+|bolts.executed_ms_avg.component_id| String | Component id for this stream |
+|bolts.executed_ms_avg.stream_id| String | Stream id for this stream |
+|bolts.executed_ms_avg.value| String (double value returned in String format) | Average time
to run the execute method of the bolt|
+
+Examples:
+
+```no-highlight
+1. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics
+1. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics?sys=1
+2. http://ui-daemon-host-name:8080/api/v1/topology/WordCount3-1-1402960825/metrics?window=600
+```
+
+Sample response:
+
+```json
+{
+    "window":":all-time",
+    "window-hint":"All time",
+    "spouts":[
+        {
+            "id":"spout",
+            "emitted":[
+                {
+                    "stream_id":"__metrics",
+                    "value":20
+                },
+                {
+                    "stream_id":"default",
+                    "value":17350280
+                },
+                {
+                    "stream_id":"__ack_init",
+                    "value":17328160
+                },
+                {
+                    "stream_id":"__system",
+                    "value":20
+                }
+            ],
+            "transferred":[
+                {
+                    "stream_id":"__metrics",
+                    "value":20
+                },
+                {
+                    "stream_id":"default",
+                    "value":17350280
+                },
+                {
+                    "stream_id":"__ack_init",
+                    "value":17328160
+                },
+                {
+                    "stream_id":"__system",
+                    "value":0
+                }
+            ],
+            "acked":[
+                {
+                    "stream_id":"default",
+                    "value":17339180
+                }
+            ],
+            "failed":[
+
+            ],
+            "complete_ms_avg":[
+                {
+                    "stream_id":"default",
+                    "value":"920.497"
+                }
+            ]
+        }
+    ],
+    "bolts":[
+        {
+            "id":"count",
+            "emitted":[
+                {
+                    "stream_id":"__metrics",
+                    "value":120
+                },
+                {
+                    "stream_id":"default",
+                    "value":190748180
+                },
+                {
+                    "stream_id":"__ack_ack",
+                    "value":190718100
+                },
+                {
+                    "stream_id":"__system",
+                    "value":20
+                }
+            ],
+            "transferred":[
+                {
+                    "stream_id":"__metrics",
+                    "value":120
+                },
+                {
+                    "stream_id":"default",
+                    "value":0
+                },
+                {
+                    "stream_id":"__ack_ack",
+                    "value":190718100
+                },
+                {
+                    "stream_id":"__system",
+                    "value":0
+                }
+            ],
+            "acked":[
+                {
+                    "component_id":"split",
+                    "stream_id":"default",
+                    "value":190733160
+                }
+            ],
+            "failed":[
+
+            ],
+            "process_ms_avg":[
+                {
+                    "component_id":"split",
+                    "stream_id":"default",
+                    "value":"0.004"
+                }
+            ],
+            "executed":[
+                {
+                    "component_id":"split",
+                    "stream_id":"default",
+                    "value":190733140
+                }
+            ],
+            "executed_ms_avg":[
+                {
+                    "component_id":"split",
+                    "stream_id":"default",
+                    "value":"0.005"
+                }
+            ]
+        },
+        {
+            "id":"split",
+            "emitted":[
+                {
+                    "stream_id":"__metrics",
+                    "value":60
+                },
+                {
+                    "stream_id":"default",
+                    "value":190754740
+                },
+                {
+                    "stream_id":"__ack_ack",
+                    "value":17317580
+                },
+                {
+                    "stream_id":"__system",
+                    "value":20
+                }
+            ],
+            "transferred":[
+                {
+                    "stream_id":"__metrics",
+                    "value":60
+                },
+                {
+                    "stream_id":"default",
+                    "value":190754740
+                },
+                {
+                    "stream_id":"__ack_ack",
+                    "value":17317580
+                },
+                {
+                    "stream_id":"__system",
+                    "value":0
+                }
+            ],
+            "acked":[
+                {
+                    "component_id":"spout",
+                    "stream_id":"default",
+                    "value":17339180
+                }
+            ],
+            "failed":[
+
+            ],
+            "process_ms_avg":[
+                {
+                    "component_id":"spout",
+                    "stream_id":"default",
+                    "value":"0.051"
+                }
+            ],
+            "executed":[
+                {
+                    "component_id":"spout",
+                    "stream_id":"default",
+                    "value":17339240
+                }
+            ],
+            "executed_ms_avg":[
+                {
+                    "component_id":"spout",
+                    "stream_id":"default",
+                    "value":"0.052"
+                }
+            ]
+        }
+    ]
+}
+```
 
 ### /api/v1/topology/:id/component/:component (GET)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/04372334/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 455691a..3ad955d 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -68,8 +68,9 @@
 (def ui:num-nimbus-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-nimbus-summary-http-requests"))

 (def ui:num-supervisor-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-supervisor-summary-http-requests"))

 (def ui:num-all-topologies-summary-http-requests (StormMetricsRegistry/registerMeter "ui:num-all-topologies-summary-http-requests"))

-(def ui:num-topology-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-page-http-requests"))

-(def ui:num-build-visualization-http-requests (StormMetricsRegistry/registerMeter "ui:num-build-visualization-http-requests"))

+(def ui:num-topology-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-page-http-requests"))
+(def ui:num-topology-metric-http-requests (StormMetricsRegistry/registerMeter "ui:num-topology-metric-http-requests"))
+(def ui:num-build-visualization-http-requests (StormMetricsRegistry/registerMeter "ui:num-build-visualization-http-requests"))
 (def ui:num-mk-visualization-data-http-requests (StormMetricsRegistry/registerMeter "ui:num-mk-visualization-data-http-requests"))

 (def ui:num-component-page-http-requests (StormMetricsRegistry/registerMeter "ui:num-component-page-http-requests"))

 (def ui:num-log-config-http-requests (StormMetricsRegistry/registerMeter "ui:num-log-config-http-requests"))

@@ -647,6 +648,102 @@
         "visualizationTable" []
         "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))))
 
+(defn- sum
+  [vals]
+  (reduce + vals))
+
+(defn- average
+  [vals]
+  (/ (sum vals) (count vals)))
+
+(defn- merge-with-conj [& mlist]
+  (let [flatten-keys (set (filter identity (flatten (map keys mlist))))
+        dict-keys-with-empty-list (zipmap flatten-keys (repeat '()))]
+    (apply merge-with conj dict-keys-with-empty-list mlist)))
+
+(defn- conj-specific-stats-by-field [window field-fn stats]
+  (apply merge-with-conj (map #(into {} (.get (field-fn %) window)) stats)))
+
+(defn- reduce-conj-specific-stats-by-field [agg-val-fn stats-map]
+  (let [fn-key-to-str (fn [key]
+                        (condp instance? key
+                          String {"stream_id" key}
+                          GlobalStreamId {"component_id" (.get_componentId key) "stream_id"
(.get_streamId key)}
+                          {"stream_id" (str key)}))]
+    (reduce-kv #(conj %1 (merge (fn-key-to-str %2) {"value" (agg-val-fn %3)})) '() stats-map)))
+
+(defn- merge-stats-specific-field-by-stream
+  [window field-fn agg-val-fn stats]
+  (reduce-conj-specific-stats-by-field agg-val-fn
+    (conj-specific-stats-by-field window field-fn stats)))
+
+(defn- merge-executor-common-stats
+  [window executor-stats]
+  {"emitted"
+   (merge-stats-specific-field-by-stream window #(.get_emitted %) sum executor-stats)
+   "transferred"
+   (merge-stats-specific-field-by-stream window #(.get_transferred %) sum executor-stats)
+   })
+
+(defmulti merge-executor-specific-stats
+  (fn [_ specific-stats]
+    (if (.is_set_spout (first specific-stats)) :spout :bolt)))
+
+(defmethod merge-executor-specific-stats :spout
+  [window specific-stats]
+  (let [stats (map #(.get_spout %) specific-stats)]
+    {"acked"
+     (merge-stats-specific-field-by-stream window #(.get_acked %) sum stats)
+     "failed"
+     (merge-stats-specific-field-by-stream window #(.get_failed %) sum stats)
+     "complete_ms_avg"
+     (merge-stats-specific-field-by-stream window #(.get_complete_ms_avg %) #(StatsUtil/floatStr
(average %)) stats)
+     }
+    ))
+
+(defmethod merge-executor-specific-stats :bolt
+  [window specific-stats]
+  (let [stats (map #(.get_bolt %) specific-stats)]
+    {"acked"
+     (merge-stats-specific-field-by-stream window #(.get_acked %) sum stats)
+     "failed"
+     (merge-stats-specific-field-by-stream window #(.get_failed %) sum stats)
+     "process_ms_avg"
+     (merge-stats-specific-field-by-stream window #(.get_process_ms_avg %) #(StatsUtil/floatStr
(average %)) stats)
+     "executed"
+     (merge-stats-specific-field-by-stream window #(.get_executed %) sum stats)
+     "executed_ms_avg"
+     (merge-stats-specific-field-by-stream window #(.get_execute_ms_avg %) #(StatsUtil/floatStr
(average %)) stats)
+     }
+    ))
+
+(defn merge-executor-stats [window component-id eslist]
+  (let [stats (map #(.get_stats %) eslist)
+        specific-stats (map #(.get_specific %) stats)]
+    (merge {"id" component-id}
+           (merge-executor-common-stats window stats)
+           (merge-executor-specific-stats window specific-stats))))
+
+(defn topology-metrics-page [id window include-sys?]
+  (thrift/with-configured-nimbus-connection nimbus
+    (let [window (if window window ":all-time")
+          window-hint (window-hint window)
+          topology (.getTopology ^Nimbus$Client nimbus id)
+          summ (->> (doto
+                      (GetInfoOptions.)
+                      (.set_num_err_choice NumErrorsChoice/NONE))
+                    (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
+          execs (.get_executors summ)
+          spout-summs (filter (partial spout-summary? topology) execs)
+          bolt-summs (filter (partial bolt-summary? topology) execs)
+          spout-comp-summs (group-by-comp spout-summs)
+          bolt-comp-summs (group-by-comp bolt-summs)
+          bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
+                                      bolt-comp-summs)
+          merged-spout-stats (map (fn [[k v]] (merge-executor-stats window k v)) spout-comp-summs)
+          merged-bolt-stats (map (fn [[k v]] (merge-executor-stats window k v)) bolt-comp-summs)]
+      (merge {"window" window "window-hint" window-hint "spouts" merged-spout-stats "bolts"
merged-bolt-stats}))))
+
 (defn topology-lag [id topology-conf]
   (thrift/with-configured-nimbus-connection nimbus
     (let [topology (.getUserTopology ^Nimbus$Client nimbus
@@ -987,6 +1084,11 @@
     (assert-authorized-user "getTopology" (topology-config id))
     (let [user (get-user-name servlet-request)]
       (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (=
scheme :https)) (:callback m))))
+  (GET "/api/v1/topology/:id/metrics" [:as {:keys [cookies servlet-request]} id & m]
+    (.mark ui:num-topology-metric-http-requests)
+    (populate-context! servlet-request)
+    (assert-authorized-user "getTopology" (topology-config id))
+    (json-response (topology-metrics-page id (:window m) (check-include-sys? (:sys m))) (:callback
m)))
   (GET "/api/v1/topology/:id/lag" [:as {:keys [cookies servlet-request scheme]} id &
m]
     (.mark ui:num-topology-lag-http-requests)
     (populate-context! servlet-request)


Mime
View raw message