storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/30] storm git commit: port backtype.storm.stats to java
Date Tue, 15 Mar 2016 17:44:34 GMT
Repository: storm
Updated Branches:
  refs/heads/master afcb2a065 -> fa25f3d7f


http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 4b96620..bcf6e4f 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -21,13 +21,14 @@
         ring.middleware.multipart-params)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
-  (:use [org.apache.storm config util log stats zookeeper converter])
+  (:use [org.apache.storm config util log zookeeper converter])
   (:use [org.apache.storm.ui helpers])
   (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
                                               ACKER-FAIL-STREAM-ID mk-authorization-handler
                                               start-metrics-reporters]]])
   (:import [org.apache.storm.utils Time]
-           [org.apache.storm.generated NimbusSummary])
+           [org.apache.storm.generated NimbusSummary]
+           [org.apache.storm.stats StatsUtil])
   (:use [clojure.string :only [blank? lower-case trim split]])
   (:import [org.apache.storm.generated ExecutorSpecificStats
             ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
@@ -109,7 +110,7 @@
 
 (defn executor-summary-type
   [topology ^ExecutorSummary s]
-  (component-type topology (.get_component_id s)))
+  (StatsUtil/componentType topology (.get_component_id s)))
 
 (defn is-ack-stream
   [stream]
@@ -119,6 +120,12 @@
          ACKER-FAIL-STREAM-ID]]
     (every? #(not= %1 stream) acker-streams)))
 
+(defn mk-include-sys-fn
+  [include-sys?]
+  (if include-sys?
+    (fn [_] true)
+    (fn [stream] (and (string? stream) (not (Utils/isSystemId stream))))))
+
 (defn spout-summary?
   [topology s]
   (= :spout (executor-summary-type topology s)))
@@ -167,7 +174,7 @@
 (defn get-error-data
   [error]
   (if error
-    (error-subset (.get_error ^ErrorInfo error))
+    (StatsUtil/errorSubset (.get_error ^ErrorInfo error))
     ""))
 
 (defn get-error-port
@@ -234,23 +241,23 @@
                    bolt-summs (get bolt-comp-summs id)
                    spout-summs (get spout-comp-summs id)
                    bolt-cap (if bolt-summs
-                              (compute-bolt-capacity bolt-summs)
+                              (StatsUtil/computeBoltCapacity bolt-summs)
                               0)]
                {:type (if bolt-summs "bolt" "spout")
                 :capacity bolt-cap
                 :latency (if bolt-summs
                            (get-in
-                             (bolt-streams-stats bolt-summs true)
+                             (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs
true))
                              [:process-latencies window])
                            (get-in
-                             (spout-streams-stats spout-summs true)
+                             (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs
true))
                              [:complete-latencies window]))
                 :transferred (or
                                (get-in
-                                 (spout-streams-stats spout-summs true)
+                                 (clojurify-structure (StatsUtil/spoutStreamsStats spout-summs
true))
                                  [:transferred window])
                                (get-in
-                                 (bolt-streams-stats bolt-summs true)
+                                 (clojurify-structure (StatsUtil/boltStreamsStats bolt-summs
true))
                                  [:transferred window]))
                 :stats (let [mapfn (fn [dat]
                                      (map (fn [^ExecutorSummary summ]
@@ -492,7 +499,7 @@
        "window" w
        "emitted" (get-in stats [:emitted w])
        "transferred" (get-in stats [:transferred w])
-       "completeLatency" (float-str (get-in stats [:complete-latencies w]))
+       "completeLatency" (StatsUtil/floatStr (get-in stats [:complete-latencies w]))
        "acked" (get-in stats [:acked w])
        "failed" (get-in stats [:failed w])})))
 
@@ -555,7 +562,7 @@
       (get-error-json topo-id (.get_last_error s) secure?)
       {"spoutId" id
        "encodedSpoutId" (URLEncoder/encode id)
-       "completeLatency" (float-str (.get_complete_latency_ms ss))})))
+       "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms ss))})))
 
 (defmethod comp-agg-stats-json ComponentType/BOLT
   [topo-id secure? [id ^ComponentAggregateStats s]]
@@ -566,10 +573,10 @@
       (get-error-json topo-id (.get_last_error s) secure?)
       {"boltId" id
        "encodedBoltId" (URLEncoder/encode id)
-       "capacity" (float-str (.get_capacity ss))
-       "executeLatency" (float-str (.get_execute_latency_ms ss))
+       "capacity" (StatsUtil/floatStr (.get_capacity ss))
+       "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms ss))
        "executed" (.get_executed ss)
-       "processLatency" (float-str (.get_process_latency_ms ss))})))
+       "processLatency" (StatsUtil/floatStr (.get_process_latency_ms ss))})))
 
 (defn- unpack-topology-page-info
   "Unpacks the serialized object to data structures"
@@ -679,10 +686,10 @@
      "transferred" (.get_transferred comm-s)
      "acked" (.get_acked comm-s)
      "failed" (.get_failed comm-s)
-     "executeLatency" (float-str (.get_execute_latency_ms bolt-s))
-     "processLatency"  (float-str (.get_process_latency_ms bolt-s))
+     "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bolt-s))
+     "processLatency"  (StatsUtil/floatStr (.get_process_latency_ms bolt-s))
      "executed" (.get_executed bolt-s)
-     "capacity" (float-str (.get_capacity bolt-s))}))
+     "capacity" (StatsUtil/floatStr (.get_capacity bolt-s))}))
 
 (defmethod unpack-comp-agg-stat ComponentType/SPOUT
   [[window ^ComponentAggregateStats s]]
@@ -695,7 +702,7 @@
      "transferred" (.get_transferred comm-s)
      "acked" (.get_acked comm-s)
      "failed" (.get_failed comm-s)
-     "completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
+     "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))}))
 
 (defn- unpack-bolt-input-stat
   [[^GlobalStreamId s ^ComponentAggregateStats stats]]
@@ -706,8 +713,8 @@
     {"component" comp-id
      "encodedComponentId" (URLEncoder/encode comp-id)
      "stream" (.get_streamId s)
-     "executeLatency" (float-str (.get_execute_latency_ms bas))
-     "processLatency" (float-str (.get_process_latency_ms bas))
+     "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
+     "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
      "executed" (Utils/nullToZero (.get_executed bas))
      "acked" (Utils/nullToZero (.get_acked cas))
      "failed" (Utils/nullToZero (.get_failed cas))}))
@@ -730,7 +737,7 @@
     {"stream" stream-id
      "emitted" (Utils/nullToZero (.get_emitted cas))
      "transferred" (Utils/nullToZero (.get_transferred cas))
-     "completeLatency" (float-str (.get_complete_latency_ms spout-s))
+     "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))
      "acked" (Utils/nullToZero (.get_acked cas))
      "failed" (Utils/nullToZero (.get_failed cas))}))
 
@@ -757,10 +764,10 @@
      "port" port
      "emitted" (Utils/nullToZero (.get_emitted cas))
      "transferred" (Utils/nullToZero (.get_transferred cas))
-     "capacity" (float-str (Utils/nullToZero (.get_capacity bas)))
-     "executeLatency" (float-str (.get_execute_latency_ms bas))
+     "capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas)))
+     "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
      "executed" (Utils/nullToZero (.get_executed bas))
-     "processLatency" (float-str (.get_process_latency_ms bas))
+     "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
      "acked" (Utils/nullToZero (.get_acked cas))
      "failed" (Utils/nullToZero (.get_failed cas))
      "workerLogLink" (worker-log-link host port topology-id secure?)}))
@@ -785,7 +792,7 @@
      "port" port
      "emitted" (Utils/nullToZero (.get_emitted cas))
      "transferred" (Utils/nullToZero (.get_transferred cas))
-     "completeLatency" (float-str (.get_complete_latency_ms sas))
+     "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas))
      "acked" (Utils/nullToZero (.get_acked cas))
      "failed" (Utils/nullToZero (.get_failed cas))
      "workerLogLink" (worker-log-link host port topology-id secure?)}))

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index ce58f42..a76db54 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -15,14 +15,15 @@
 ;; limitations under the License.
 (ns org.apache.storm.nimbus-test
   (:use [clojure test])
-  (:require [org.apache.storm [util :as util] [stats :as stats]])
+  (:require [org.apache.storm [util :as util]])
   (:require [org.apache.storm.daemon [nimbus :as nimbus]])
   (:require [org.apache.storm [converter :as converter]])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
             TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
            [org.apache.storm.generated GlobalStreamId]
-           [org.apache.storm Thrift])
+           [org.apache.storm Thrift]
+           [org.apache.storm.stats StatsUtil])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.scheduler INimbus])
   (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
@@ -139,7 +140,8 @@
         curr-beat (.get-worker-heartbeat state storm-id node port)
         stats (:executor-stats curr-beat)]
     (.worker-heartbeat! state storm-id node port
-      {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge
stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}
+      {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10
+       :executor-stats (merge stats {executor (clojurify-structure (StatsUtil/renderStats
(StatsUtil/mkBoltStats 20)))})}
       )))
 
 (defn slot-assignments [cluster storm-id]


Mime
View raw message