storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/6] storm git commit: [STORM-1158] - Storm metrics to profile various storm functions
Date Fri, 06 Nov 2015 17:08:10 GMT
Repository: storm
Updated Branches:
  refs/heads/master b24b0fece -> 1a94d9e28


[STORM-1158] - Storm metrics to profile various storm functions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b98acd3c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b98acd3c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b98acd3c

Branch: refs/heads/master
Commit: b98acd3c41278870e10cc0c1123ae69f24070ad6
Parents: f3ed08b
Author: Boyang Jerry Peng <jerrypeng@yahoo-inc.com>
Authored: Wed Nov 4 13:45:12 2015 -0600
Committer: Boyang Jerry Peng <jerrypeng@yahoo-inc.com>
Committed: Wed Nov 4 13:45:12 2015 -0600

----------------------------------------------------------------------
 pom.xml                                         | 12 ++++
 storm-core/pom.xml                              | 47 ++++++++++++++
 .../src/clj/backtype/storm/daemon/common.clj    |  5 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      | 18 +++++-
 .../src/clj/backtype/storm/daemon/logviewer.clj | 19 +++++-
 .../src/clj/backtype/storm/daemon/nimbus.clj    | 64 +++++++++++++++++++-
 .../clj/backtype/storm/daemon/supervisor.clj    | 10 ++-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 44 +++++++++++++-
 .../src/clj/backtype/storm/ui/helpers.clj       | 11 +++-
 9 files changed, 220 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 008a988..0c2b8cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.1</log4j.version>
         <slf4j.version>1.7.7</slf4j.version>
+        <metrics.version>3.1.0</metrics.version>
         <clojure.tools.nrepl.version>0.2.3</clojure.tools.nrepl.version>
         <clojure-complete.version>0.2.3</clojure-complete.version>
         <mockito.version>1.9.5</mockito.version>
@@ -213,6 +214,7 @@
         <jackson.version>2.3.1</jackson.version>
         <thrift.version>0.9.2</thrift.version>
         <junit.version>4.11</junit.version>
+        <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
     </properties>
 
@@ -594,6 +596,16 @@
                 <version>${netty.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${metrics.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>metrics-clojure</groupId>
+                <artifactId>metrics-clojure</artifactId>
+                <version>${metrics-clojure.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.clojure</groupId>
                 <artifactId>clojure-contrib</artifactId>
                 <version>${clojure-contrib.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 87a5804..e20d9da 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -239,6 +239,14 @@
             <artifactId>netty</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>metrics-clojure</groupId>
+            <artifactId>metrics-clojure</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
@@ -438,6 +446,8 @@
                             <include>org.clojure:tools.namespace</include>
                             <include>cheshire:cheshire</include>
                             <include>org.clojure:core.incubator</include>
+                            <include>io.dropwizard.metrics:*</include>
+                            <include>metrics-clojure:*</include>
                         </includes>
                     </artifactSet>
                     <relocations>
@@ -602,6 +612,42 @@
                           <pattern>org.eclipse.jetty</pattern>
                           <shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern>
                         </relocation>
+                        <relocation>
+                            <pattern>com.codahale.metrics</pattern>
+                            <shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.core</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.counters</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.counters</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.gauges</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.gauges</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.histograms</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.histograms</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.meters</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.meters</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.reporters</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.reporters</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.timers</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.timers</shadedPattern>
+                        </relocation>
+                        <relocation>
+                            <pattern>metrics.utils</pattern>
+                            <shadedPattern>org.apache.storm.shade.metrics.utils</shadedPattern>
+                        </relocation>
                     </relocations>
                     <transformers>
                         <transformer implementation="org.apache.storm.maven.shade.clojure.ClojureTransformer"
/>
@@ -613,6 +659,7 @@
                              the majority of cases correctly.  However, the Clojure-Transformer
does not shade everything correctly all the
                              time.  Instead of spending a lot of time to get the Clojure-Transformer
to parse Clojure correctly we opted to remove
                              the .clj files from the uber jar. -->
+                        <filter><artifact>metrics-clojure:*</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
                         <filter><artifact>org.clojure:core.incubator</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
                         <filter><artifact>cheshire:cheshire</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
                         <filter><artifact>org.clojure:tools.logging</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index 66b87f4..35ae139 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -27,7 +27,10 @@
   (:require [clojure.set :as set])  
   (:require [backtype.storm.daemon.acker :as acker])
   (:require [backtype.storm.thrift :as thrift])
-  )
+  (:require [metrics.reporters.jmx :as jmx]))
+
+(defn start-metrics-reporters []
+  (jmx/start (jmx/reporter {})))
 
 (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
 (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
index abaf2e7..40744fb 100644
--- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
@@ -31,8 +31,16 @@
   (:use compojure.core)
   (:use ring.middleware.reload)
   (:require [compojure.handler :as handler])
+  (:require [metrics.meters :refer [defmeter mark!]])
   (:gen-class))
 
+(defmeter drpc:num-execute-http-requests)
+(defmeter drpc:num-execute-calls)
+(defmeter drpc:num-result-calls)
+(defmeter drpc:num-failRequest-calls)
+(defmeter drpc:num-fetchRequest-calls)
+(defmeter drpc:num-shutdown-calls)
+
 (defn timeout-check-secs [] 5)
 
 (defn acquire-queue [queues-atom function]
@@ -87,6 +95,7 @@
     (reify DistributedRPC$Iface
       (^String execute
         [this ^String function ^String args]
+        (mark! drpc:num-execute-calls)
         (log-debug "Received DRPC request for " function " (" args ") at " (System/currentTimeMillis))
         (check-authorization drpc-acl-handler
                              {DRPCAuthorizerBase/FUNCTION_NAME function}
@@ -116,6 +125,7 @@
 
       (^void result
         [this ^String id ^String result]
+        (mark! drpc:num-result-calls)
         (when-let [func (@id->function id)]
           (check-authorization drpc-acl-handler
                                {DRPCAuthorizerBase/FUNCTION_NAME func}
@@ -129,6 +139,7 @@
 
       (^void failRequest
         [this ^String id]
+        (mark! drpc:num-failRequest-calls)
         (when-let [func (@id->function id)]
           (check-authorization drpc-acl-handler
                                {DRPCAuthorizerBase/FUNCTION_NAME func}
@@ -140,6 +151,7 @@
 
       (^DRPCRequest fetchRequest
         [this ^String func]
+        (mark! drpc:num-fetchRequest-calls)
         (check-authorization drpc-acl-handler
                              {DRPCAuthorizerBase/FUNCTION_NAME func}
                              "fetchRequest")
@@ -154,6 +166,7 @@
 
       (shutdown
         [this]
+        (mark! drpc:num-shutdown-calls)
         (.interrupt clear-thread)))))
 
 (defn handle-request [handler]
@@ -167,6 +180,7 @@
       (.populateContext http-creds-handler (ReqContext/context) servlet-request)))
 
 (defn webapp [handler http-creds-handler]
+  (mark! drpc:num-execute-http-requests)
   (->
     (routes
       (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
@@ -215,7 +229,8 @@
       (log-message "Starting Distributed RPC servers...")
       (future (.serve invoke-server))
       (when (> drpc-http-port 0)
-        (let [app (webapp drpc-service-handler http-creds-handler)
+        (let [app (-> (webapp drpc-service-handler http-creds-handler)
+                    requests-middleware)
               filter-class (conf DRPC-HTTP-FILTER)
               filter-params (conf DRPC-HTTP-FILTER-PARAMS)
               filters-confs [{:filter-class filter-class
@@ -246,6 +261,7 @@
                                         https-need-client-auth
                                         https-want-client-auth)
                             (config-filter server app filters-confs))})))
+      (start-metrics-reporters)
       (when handler-server
         (.serve handler-server)))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 5303a25..5fe2e26 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -39,10 +39,18 @@
             [ring.util.codec :as codec]
             [ring.util.response :as resp]
             [clojure.string :as string])
+  (:require [metrics.meters :refer [defmeter mark!]])
+  (:use [backtype.storm.daemon.common :only [start-metrics-reporters]])
   (:gen-class))
 
 (def ^:dynamic *STORM-CONF* (read-storm-config))
 
+(defmeter num:logviewer-log-page-httpRequests)
+(defmeter num:logviewer-daemonlog-page-httpRequests)
+(defmeter num:logviewer-download-log-file-httpRequests)
+(defmeter num:logviewer-download-log-daemon-file-httpRequests)
+(defmeter num:logviewer-list-logs-httpRequests)
+
 (defn cleanup-cutoff-age-millis [conf now-millis]
   (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
 
@@ -537,6 +545,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
 (defroutes log-routes
   (GET "/log" [:as req & m]
     (try
+      (mark! num:logviewer-log-page-httpRequests)
       (let [servlet-request (:servlet-request req)
             log-root (:log-root req)
             user (.getUserName http-creds-handler servlet-request)
@@ -595,6 +604,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
            (resp/status 404)))))
   (GET "/daemonlog" [:as req & m]
     (try
+      (mark! num:logviewer-daemonlog-page-httpRequests)
       (let [servlet-request (:servlet-request req)
             daemonlog-root (:daemonlog-root req)
             user (.getUserName http-creds-handler servlet-request)
@@ -608,6 +618,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (ring-response-from-exception ex))))
   (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file &
m]
     (try
+      (mark! num:logviewer-download-log-file-httpRequests)
       (let [user (.getUserName http-creds-handler servlet-request)]
         (download-log-file file servlet-request servlet-response user log-root))
       (catch InvalidRequestException ex
@@ -615,6 +626,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (ring-response-from-exception ex))))
   (GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]}
file & m]
     (try
+      (mark! num:logviewer-download-log-daemon-file-httpRequests)
       (let [user (.getUserName http-creds-handler servlet-request)]
         (download-log-file file servlet-request servlet-response user daemonlog-root))
       (catch InvalidRequestException ex
@@ -622,6 +634,7 @@ Note that if anything goes wrong, this will throw an Error and exit."
         (ring-response-from-exception ex))))
   (GET "/listLogs" [:as req & m]
     (try
+      (mark! num:logviewer-list-logs-httpRequests)
       (let [servlet-request (:servlet-request req)
             user (.getUserName http-creds-handler servlet-request)]
         (list-log-files user
@@ -647,7 +660,8 @@ Note that if anything goes wrong, this will throw an Error and exit."
     (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
           filter-class (conf UI-FILTER)
           filter-params (conf UI-FILTER-PARAMS)
-          logapp (handler/api log-routes) ;; query params as map
+          logapp (handler/api (-> log-routes
+                                requests-middleware))  ;; query params as map
           middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
           filters-confs (if (conf UI-FILTER)
                           [{:filter-class filter-class
@@ -690,4 +704,5 @@ Note that if anything goes wrong, this will throw an Error and exit."
         daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
     (setup-default-uncaught-exception-handler)
     (start-log-cleaner! conf log-root)
-    (start-logviewer! conf log-root daemonlog-root)))
+    (start-logviewer! conf log-root daemonlog-root)
+    (start-metrics-reporters)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 205c6f4..edb3d4c 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -51,9 +51,39 @@
   (:import [backtype.storm.utils VersionInfo])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
+  (:require [metrics.meters :refer [defmeter mark!]])
+  (:require [metrics.gauges :refer [defgauge]])
   (:gen-class
     :methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]]))
 
+
+(defmeter nimbus:num-submitTopologyWithOpts-calls)
+(defmeter nimbus:num-submitTopology-calls)
+(defmeter nimbus:num-killTopologyWithOpts-calls)
+(defmeter nimbus:num-killTopology-calls)
+(defmeter nimbus:num-rebalance-calls)
+(defmeter nimbus:num-activate-calls)
+(defmeter nimbus:num-deactivate-calls)
+(defmeter nimbus:num-debug-calls)
+(defmeter nimbus:num-setLogConfig-calls)
+(defmeter nimbus:num-uploadNewCredentials-calls)
+(defmeter nimbus:num-beginFileUpload-calls)
+(defmeter nimbus:num-uploadChunk-calls)
+(defmeter nimbus:num-finishFileUpload-calls)
+(defmeter nimbus:num-beginFileDownload-calls)
+(defmeter nimbus:num-downloadChunk-calls)
+(defmeter nimbus:num-getNimbusConf-calls)
+(defmeter nimbus:num-getLogConfig-calls)
+(defmeter nimbus:num-getTopologyConf-calls)
+(defmeter nimbus:num-getTopology-calls)
+(defmeter nimbus:num-getUserTopology-calls)
+(defmeter nimbus:num-getClusterInfo-calls)
+(defmeter nimbus:num-getTopologyInfoWithOpts-calls)
+(defmeter nimbus:num-getTopologyInfo-calls)
+(defmeter nimbus:num-getTopologyPageInfo-calls)
+(defmeter nimbus:num-getComponentPageInfo-calls)
+(defmeter nimbus:num-shutdown-calls)
+
 (defn file-cache-map [conf]
   (TimeCacheMap.
    (int (conf NIMBUS-FILE-COPY-EXPIRATION-SECS))
@@ -1193,11 +1223,18 @@
                         (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
                         (fn []
                           (renew-credentials nimbus)))
+
+    (defgauge nimbus:num-supervisors
+      (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
+
+    (start-metrics-reporters)
+
     (reify Nimbus$Iface
       (^void submitTopologyWithOpts
         [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology
topology
          ^SubmitOptions submitOptions]
         (try
+          (mark! nimbus:num-submitTopologyWithOpts-calls)
           (is-leader nimbus)
           (assert (not-nil? submitOptions))
           (validate-topology-name! storm-name)
@@ -1272,13 +1309,16 @@
       
       (^void submitTopology
         [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology
topology]
+        (mark! nimbus:num-submitTopology-calls)
         (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                  (SubmitOptions. TopologyInitialStatus/ACTIVE)))
       
       (^void killTopology [this ^String name]
-         (.killTopologyWithOpts this name (KillOptions.)))
+        (mark! nimbus:num-killTopology-calls)
+        (.killTopologyWithOpts this name (KillOptions.)))
 
       (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
+        (mark! nimbus:num-killTopologyWithOpts-calls)
         (check-storm-active! nimbus storm-name true)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)]
           (check-authorization! nimbus storm-name topology-conf "killTopology"))
@@ -1289,6 +1329,7 @@
           ))
 
       (^void rebalance [this ^String storm-name ^RebalanceOptions options]
+        (mark! nimbus:num-rebalance-calls)
         (check-storm-active! nimbus storm-name true)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)]
           (check-authorization! nimbus storm-name topology-conf "rebalance"))
@@ -1307,17 +1348,20 @@
           ))
 
       (activate [this storm-name]
+        (mark! nimbus:num-activate-calls)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)]
           (check-authorization! nimbus storm-name topology-conf "activate"))
         (transition-name! nimbus storm-name :activate true)
         )
 
       (deactivate [this storm-name]
+        (mark! nimbus:num-deactivate-calls)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)]
           (check-authorization! nimbus storm-name topology-conf "deactivate"))
         (transition-name! nimbus storm-name :inactivate true))
 
       (debug [this storm-name component-id enable? samplingPct]
+        (mark! nimbus:num-debug-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id)
@@ -1367,6 +1411,7 @@
           latest-profile-actions))
 
       (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
+        (mark! nimbus:num-setLogConfig-calls)
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
@@ -1392,6 +1437,7 @@
             (.set-topology-log-config! storm-cluster-state id merged-log-config)))
 
       (uploadNewCredentials [this storm-name credentials]
+        (mark! nimbus:num-uploadNewCredentials-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id)
@@ -1400,6 +1446,7 @@
           (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id
creds topology-conf))))
 
       (beginFileUpload [this]
+        (mark! nimbus:num-beginFileUpload-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
           (.put (:uploaders nimbus)
@@ -1410,6 +1457,7 @@
           ))
 
       (^void uploadChunk [this ^String location ^ByteBuffer chunk]
+        (mark! nimbus:num-uploadChunk-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [uploaders (:uploaders nimbus)
               ^WritableByteChannel channel (.get uploaders location)]
@@ -1421,6 +1469,7 @@
           ))
 
       (^void finishFileUpload [this ^String location]
+        (mark! nimbus:num-finishFileUpload-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [uploaders (:uploaders nimbus)
               ^WritableByteChannel channel (.get uploaders location)]
@@ -1433,6 +1482,7 @@
           ))
 
       (^String beginFileDownload [this ^String file]
+        (mark! nimbus:num-beginFileDownload-calls)
         (check-authorization! nimbus nil nil "fileDownload")
         (check-file-access (:conf nimbus) file)
         (let [is (BufferFileInputStream. file)
@@ -1442,6 +1492,7 @@
           ))
 
       (^ByteBuffer downloadChunk [this ^String id]
+        (mark! nimbus:num-downloadChunk-calls)
         (check-authorization! nimbus nil nil "fileDownload")
         (let [downloaders (:downloaders nimbus)
               ^BufferFileInputStream is (.get downloaders id)]
@@ -1456,10 +1507,12 @@
             )))
 
       (^String getNimbusConf [this]
+        (mark! nimbus:num-getNimbusConf-calls)
         (check-authorization! nimbus nil nil "getNimbusConf")
         (to-json (:conf nimbus)))
 
       (^LogConfig getLogConfig [this ^String id]
+        (mark! nimbus:num-getLogConfig-calls)
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
@@ -1468,24 +1521,28 @@
            (if log-config log-config (LogConfig.))))
 
       (^String getTopologyConf [this ^String id]
+        (mark! nimbus:num-getTopologyConf-calls)
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
               (to-json topology-conf)))
 
       (^StormTopology getTopology [this ^String id]
+        (mark! nimbus:num-getTopology-calls)
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopology")
               (system-topology! topology-conf (try-read-storm-topology conf id))))
 
       (^StormTopology getUserTopology [this ^String id]
+        (mark! nimbus:num-getUserTopology-calls)
         (let [topology-conf (try-read-storm-conf conf id)
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getUserTopology")
               (try-read-storm-topology topology-conf id)))
 
       (^ClusterSummary getClusterInfo [this]
+        (mark! nimbus:num-getClusterInfo-calls)
         (check-authorization! nimbus nil nil "getClusterInfo")
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               supervisor-infos (all-supervisor-info storm-cluster-state)
@@ -1550,6 +1607,7 @@
           ))
       
       (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
+        (mark! nimbus:num-getTopologyInfoWithOpts-calls)
         (let [{:keys [storm-name
                       storm-cluster-state
                       all-components
@@ -1613,12 +1671,14 @@
           ))
 
       (^TopologyInfo getTopologyInfo [this ^String topology-id]
+        (mark! nimbus:num-getTopologyInfo-calls)
         (.getTopologyInfoWithOpts this
                                   topology-id
                                   (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
 
       (^TopologyPageInfo getTopologyPageInfo
         [this ^String topo-id ^String window ^boolean include-sys?]
+        (mark! nimbus:num-getTopologyPageInfo-calls)
         (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
 
               exec->node+port (:executor->node+port (:assignment info))
@@ -1665,6 +1725,7 @@
          ^String component-id
          ^String window
          ^boolean include-sys?]
+        (mark! nimbus:num-getComponentPageInfo-calls)
         (let [info (get-common-topo-info topo-id "getComponentPageInfo")
               {:keys [executor->node+port node->host]} (:assignment info)
               executor->host+port (map-val (fn [[node port]]
@@ -1709,6 +1770,7 @@
 
       Shutdownable
       (shutdown [this]
+        (mark! nimbus:num-shutdown-calls)
         (log-message "Shutting down master")
         (cancel-timer (:timer nimbus))
         (.disconnect (:storm-cluster-state nimbus))

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index c86b73f..adc6268 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -34,10 +34,13 @@
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
   (:import [org.yaml.snakeyaml Yaml]
            [org.yaml.snakeyaml.constructor SafeConstructor])
-  (:import [java.util Date])
+  (:require [metrics.gauges :refer [defgauge]])
+  (:require [metrics.meters :refer [defmeter mark!]])
   (:gen-class
     :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
 
+(defmeter supervisor:num-workers-launched)
+
 (defmulti download-storm-code cluster-mode)
 (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
 (defmulti mk-code-distributor cluster-mode)
@@ -419,6 +422,7 @@
                                port
                                id
                                mem-onheap)
+                (mark! supervisor:num-workers-launched)
                 (catch java.io.FileNotFoundException e
                   (log-message "Unable to launch worker due to "
                                (.getMessage e)))
@@ -957,7 +961,9 @@
   (let [conf (read-storm-config)]
     (validate-distributed-mode! conf)
     (let [supervisor (mk-supervisor conf nil supervisor)]
-      (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))))
+      (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))
+    (defgauge supervisor:num-slots-used-gauge #(count (my-worker-ids conf)))
+    (start-metrics-reporters)))
 
 (defn standalone-supervisor []
   (let [conf-atom (atom nil)

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 60404f0..7fa367b 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -24,7 +24,8 @@
   (:use [backtype.storm config util log stats tuple zookeeper converter])
   (:use [backtype.storm.ui helpers])
   (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
-                                              ACKER-FAIL-STREAM-ID mk-authorization-handler]]])
+                                              ACKER-FAIL-STREAM-ID mk-authorization-handler
+                                              start-metrics-reporters]]])
   (:import [backtype.storm.utils Utils]
            [backtype.storm.generated NimbusSummary])
   (:use [clojure.string :only [blank? lower-case trim split]])
@@ -46,6 +47,7 @@
             [compojure.handler :as handler]
             [ring.util.response :as resp]
             [backtype.storm [thrift :as thrift]])
+  (:require [metrics.meters :refer [defmeter mark!]])
   (:import [org.apache.commons.lang StringEscapeUtils])
   (:import [org.apache.logging.log4j Level])
   (:gen-class))
@@ -53,9 +55,27 @@
 (def ^:dynamic *STORM-CONF* (read-storm-config))
 (def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER)
*STORM-CONF*))
 (def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER)
*STORM-CONF*))
-
 (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
 
+(defmeter ui:num-cluster-configuration-httpRequests)
+(defmeter ui:num-cluster-summary-httpRequests)
+(defmeter ui:num-nimbus-summary-httpRequests)
+(defmeter ui:num-supervisor-summary-httpRequests)
+(defmeter ui:num-all-topologies-summary-httpRequests)
+(defmeter ui:num-topology-page-httpRequests)
+(defmeter ui:num-build-visualization-httpRequests)
+(defmeter ui:num-mk-visualization-data-httpRequests)
+(defmeter ui:num-component-page-httpRequests)
+(defmeter ui:num-log-config-httpRequests)
+(defmeter ui:num-activate-topology-httpRequests)
+(defmeter ui:num-deactivate-topology-httpRequests)
+(defmeter ui:num-debug-topology-httpRequests)
+(defmeter ui:num-component-op-response-httpRequests)
+(defmeter ui:num-topology-op-response-httpRequests)
+(defmeter ui:num-topology-op-response-httpRequests)
+(defmeter ui:num-topology-op-response-httpRequests)
+(defmeter ui:num-main-page-httpRequests)
+
 (defn assert-authorized-user
   ([op]
     (assert-authorized-user op nil))
@@ -867,39 +887,48 @@
 
 (defroutes main-routes
   (GET "/api/v1/cluster/configuration" [& m]
+    (mark! ui:num-cluster-configuration-httpRequests)
     (json-response (cluster-configuration)
                    (:callback m) :serialize-fn identity))
   (GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
+    (mark! ui:num-cluster-summary-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getClusterInfo")
     (let [user (get-user-name servlet-request)]
       (json-response (cluster-summary user) (:callback m))))
   (GET "/api/v1/nimbus/summary" [:as {:keys [cookies servlet-request]} & m]
+    (mark! ui:num-nimbus-summary-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getClusterInfo")
     (json-response (nimbus-summary) (:callback m)))
   (GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
+    (mark! ui:num-supervisor-summary-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getClusterInfo")
     (json-response (supervisor-summary) (:callback m)))
   (GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
+    (mark! ui:num-all-topologies-summary-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getClusterInfo")
     (json-response (all-topologies-summary) (:callback m)))
   (GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme]} id & m]
+    (mark! ui:num-topology-page-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
     (let [user (get-user-name servlet-request)]
       (json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (=
scheme :https)) (:callback m))))
   (GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id
& m]
+    (mark! ui:num-build-visualization-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
     (json-response (build-visualization id (:window m) (check-include-sys? (:sys m))) (:callback
m)))
   (GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id &
m]
+    (mark! ui:num-mk-visualization-data-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
     (json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback
m)))
   (GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]}
id component & m]
+    (mark! ui:num-component-page-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
     (let [user (get-user-name servlet-request)]
@@ -907,10 +936,12 @@
           (component-page id component (:window m) (check-include-sys? (:sys m)) user (=
scheme :https))
           (:callback m))))
   (GET "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id & m]
+    (mark! ui:num-log-config-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "getTopology" (topology-config id))
        (json-response (log-config id) (:callback m)))
   (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
+    (mark! ui:num-activate-topology-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "activate" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -923,6 +954,7 @@
         (log-message "Activating topology '" name "'")))
     (json-response (topology-op-response id "activate") (m "callback")))
   (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id &
m]
+    (mark! ui:num-deactivate-topology-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "deactivate" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -935,6 +967,7 @@
         (log-message "Deactivating topology '" name "'")))
     (json-response (topology-op-response id "deactivate") (m "callback")))
   (POST "/api/v1/topology/:id/debug/:action/:spct" [:as {:keys [cookies servlet-request]}
id action spct & m]
+    (mark! ui:num-debug-topology-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "debug" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -948,6 +981,7 @@
         (log-message "Debug topology [" name "] action [" action "] sampling pct [" spct
"]")))
      (json-response (topology-op-response id (str "debug/" action)) (m "callback")))
   (POST "/api/v1/topology/:id/component/:component/debug/:action/:spct" [:as {:keys [cookies
servlet-request]} id component action spct & m]
+    (mark! ui:num-component-op-response-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "debug" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -961,6 +995,7 @@
         (log-message "Debug topology [" name "] component [" component "] action [" action
"] sampling pct [" spct "]")))
     (json-response (component-op-response id component (str "/debug/" action)) (m "callback")))
   (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]}
id wait-time & m]
+    (mark! ui:num-topology-op-response-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "rebalance" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -981,6 +1016,7 @@
         (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
     (json-response (topology-op-response id "rebalance") (m "callback")))
   (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id
wait-time & m]
+    (mark! ui:num-topology-op-response-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "killTopology" (topology-config id))
     (thrift/with-configured-nimbus-connection nimbus
@@ -995,6 +1031,7 @@
         (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
     (json-response (topology-op-response id "kill") (m "callback")))
   (POST "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id namedLoggerLevels
& m]
+    (mark! ui:num-topology-op-response-httpRequests)
     (populate-context! servlet-request)
     (assert-authorized-user "setLogConfig" (topology-config id))
     (thrift/with-configured-nimbus-connection
@@ -1143,6 +1180,7 @@
                           (m "callback")))))
   
   (GET "/" [:as {cookies :cookies}]
+    (mark! ui:num-main-page-httpRequests)
     (resp/redirect "/index.html"))
   (route/resources "/")
   (route/not-found "Page not found"))
@@ -1160,6 +1198,7 @@
                     (wrap-json-params)
                     (wrap-multipart-params)
                     (wrap-reload '[backtype.storm.ui.core])
+                    requests-middleware
                     catch-errors)))
 
 (defn start-server!
@@ -1179,6 +1218,7 @@
           https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
           https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
           https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
+      (start-metrics-reporters)
       (storm-run-jetty {:port (conf UI-PORT)
                         :host (conf UI-HOST)
                         :https-port https-port

http://git-wip-us.apache.org/repos/asf/storm/blob/b98acd3c/storm-core/src/clj/backtype/storm/ui/helpers.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/helpers.clj b/storm-core/src/clj/backtype/storm/ui/helpers.clj
index 9b82aaa..43cfa22 100644
--- a/storm-core/src/clj/backtype/storm/ui/helpers.clj
+++ b/storm-core/src/clj/backtype/storm/ui/helpers.clj
@@ -34,7 +34,16 @@
            [org.eclipse.jetty.servlets CrossOriginFilter])
   (:require [ring.util servlet])
   (:require [compojure.route :as route]
-            [compojure.handler :as handler]))
+            [compojure.handler :as handler])
+  (:require [metrics.meters :refer [defmeter mark!]]))
+
+(defmeter num-web-requests)
+(defn requests-middleware
+  "Coda Hale metric for counting the number of web requests."
+  [handler]
+  (fn [req]
+    (mark! num-web-requests)
+    (handler req)))
 
 (defn split-divide [val divider]
   [(Integer. (int (/ val divider))) (mod val divider)]


Mime
View raw message