storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/6] storm git commit: change defmeter and defgauge in nimbus to java metrics code
Date Mon, 07 Mar 2016 19:03:29 GMT
Repository: storm
Updated Branches:
  refs/heads/master 96f81d793 -> 7a4824b7f


change defmeter and defgauge in nimbus to java metrics code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a23533ca
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a23533ca
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a23533ca

Branch: refs/heads/master
Commit: a23533cac02c53742e83e1049783c255489521f7
Parents: 87e3c24
Author: 卫乐 <weiyue.wy@taobao.com>
Authored: Tue Mar 1 23:34:58 2016 +0800
Committer: 卫乐 <weiyue.wy@taobao.com>
Committed: Tue Mar 1 23:34:58 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 122 +++++++++----------
 .../storm/metric/StormMetricsRegistry.java      |  69 +++++++++++
 2 files changed, 130 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a23533ca/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index ed26a79..c05482c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -14,7 +14,8 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.daemon.nimbus
-  (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+  (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]
+           [org.apache.storm.metric StormMetricsRegistry])
   (:import [org.apache.storm.generated KeyNotFoundException])
   (:import [org.apache.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
@@ -63,40 +64,38 @@
            [org.json.simple JSONValue])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
-  (:require [metrics.meters :refer [defmeter mark!]])
-  (:require [metrics.gauges :refer [defgauge]])
   (:import [org.apache.storm StormTimer])
   (:gen-class
     :methods [^{:static true} [launch [org.apache.storm.scheduler.INimbus] void]]))
 
-(defmeter nimbus:num-submitTopologyWithOpts-calls)
-(defmeter nimbus:num-submitTopology-calls)
-(defmeter nimbus:num-killTopologyWithOpts-calls)
-(defmeter nimbus:num-killTopology-calls)
-(defmeter nimbus:num-rebalance-calls)
-(defmeter nimbus:num-activate-calls)
-(defmeter nimbus:num-deactivate-calls)
-(defmeter nimbus:num-debug-calls)
-(defmeter nimbus:num-setWorkerProfiler-calls)
-(defmeter nimbus:num-getComponentPendingProfileActions-calls)
-(defmeter nimbus:num-setLogConfig-calls)
-(defmeter nimbus:num-uploadNewCredentials-calls)
-(defmeter nimbus:num-beginFileUpload-calls)
-(defmeter nimbus:num-uploadChunk-calls)
-(defmeter nimbus:num-finishFileUpload-calls)
-(defmeter nimbus:num-beginFileDownload-calls)
-(defmeter nimbus:num-downloadChunk-calls)
-(defmeter nimbus:num-getNimbusConf-calls)
-(defmeter nimbus:num-getLogConfig-calls)
-(defmeter nimbus:num-getTopologyConf-calls)
-(defmeter nimbus:num-getTopology-calls)
-(defmeter nimbus:num-getUserTopology-calls)
-(defmeter nimbus:num-getClusterInfo-calls)
-(defmeter nimbus:num-getTopologyInfoWithOpts-calls)
-(defmeter nimbus:num-getTopologyInfo-calls)
-(defmeter nimbus:num-getTopologyPageInfo-calls)
-(defmeter nimbus:num-getComponentPageInfo-calls)
-(defmeter nimbus:num-shutdown-calls)
+(def nimbus:num-submitTopologyWithOpts-calls (StormMetricsRegistry/registerMeter "nimbus:num-submitTopologyWithOpts-calls"))
+(def nimbus:num-submitTopology-calls (StormMetricsRegistry/registerMeter "nimbus:num-submitTopology-calls"))
+(def nimbus:num-killTopologyWithOpts-calls (StormMetricsRegistry/registerMeter "nimbus:num-killTopologyWithOpts-calls"))
+(def nimbus:num-killTopology-calls (StormMetricsRegistry/registerMeter "nimbus:num-killTopology-calls"))

+(def nimbus:num-rebalance-calls (StormMetricsRegistry/registerMeter "nimbus:num-rebalance-calls"))
+(def nimbus:num-activate-calls (StormMetricsRegistry/registerMeter "nimbus:num-activate-calls"))
+(def nimbus:num-deactivate-calls (StormMetricsRegistry/registerMeter "nimbus:num-deactivate-calls"))
+(def nimbus:num-debug-calls (StormMetricsRegistry/registerMeter "nimbus:num-debug-calls"))
+(def nimbus:num-setWorkerProfiler-calls (StormMetricsRegistry/registerMeter "nimbus:num-setWorkerProfiler-calls"))
+(def nimbus:num-getComponentPendingProfileActions-calls (StormMetricsRegistry/registerMeter
"nimbus:num-getComponentPendingProfileActions-calls"))
+(def nimbus:num-setLogConfig-calls (StormMetricsRegistry/registerMeter "nimbus:num-setLogConfig-calls"))
+(def nimbus:num-uploadNewCredentials-calls (StormMetricsRegistry/registerMeter "nimbus:num-uploadNewCredentials-calls"))
+(def nimbus:num-beginFileUpload-calls (StormMetricsRegistry/registerMeter "nimbus:num-beginFileUpload-calls"))
+(def nimbus:num-uploadChunk-calls (StormMetricsRegistry/registerMeter "nimbus:num-uploadChunk-calls"))
+(def nimbus:num-finishFileUpload-calls (StormMetricsRegistry/registerMeter "nimbus:num-finishFileUpload-calls"))
+(def nimbus:num-beginFileDownload-calls (StormMetricsRegistry/registerMeter "nimbus:num-beginFileDownload-calls"))
+(def nimbus:num-downloadChunk-calls (StormMetricsRegistry/registerMeter "nimbus:num-downloadChunk-calls"))
+(def nimbus:num-getNimbusConf-calls (StormMetricsRegistry/registerMeter "nimbus:num-getNimbusConf-calls"))
+(def nimbus:num-getLogConfig-calls (StormMetricsRegistry/registerMeter "nimbus:num-getLogConfig-calls"))
+(def nimbus:num-getTopologyConf-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyConf-calls"))
+(def nimbus:num-getTopology-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopology-calls"))
+(def nimbus:num-getUserTopology-calls (StormMetricsRegistry/registerMeter "nimbus:num-getUserTopology-calls"))
+(def nimbus:num-getClusterInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getClusterInfo-calls"))
+(def nimbus:num-getTopologyInfoWithOpts-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyInfoWithOpts-calls"))
+(def nimbus:num-getTopologyInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyInfo-calls"))
+(def nimbus:num-getTopologyPageInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getTopologyPageInfo-calls"))
+(def nimbus:num-getComponentPageInfo-calls (StormMetricsRegistry/registerMeter "nimbus:num-getComponentPageInfo-calls"))
+(def nimbus:num-shutdown-calls (StormMetricsRegistry/registerMeter "nimbus:num-shutdown-calls"))
 
 (def STORM-VERSION (VersionInfo/getVersion))
 
@@ -1487,8 +1486,8 @@
       (fn []
         (renew-credentials nimbus)))
 
-    (defgauge nimbus:num-supervisors
-      (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
+    (def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors"
+      (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
 
     (start-metrics-reporters conf)
 
@@ -1497,7 +1496,7 @@
         [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology
topology
          ^SubmitOptions submitOptions]
         (try
-          (mark! nimbus:num-submitTopologyWithOpts-calls)
+          (.mark nimbus:num-submitTopologyWithOpts-calls)
           (is-leader nimbus)
           (assert (not-nil? submitOptions))
           (validate-topology-name! storm-name)
@@ -1577,16 +1576,16 @@
 
       (^void submitTopology
         [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology
topology]
-        (mark! nimbus:num-submitTopology-calls)
+        (.mark nimbus:num-submitTopology-calls)
         (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                  (SubmitOptions. TopologyInitialStatus/ACTIVE)))
 
       (^void killTopology [this ^String name]
-        (mark! nimbus:num-killTopology-calls)
+        (.mark nimbus:num-killTopology-calls)
         (.killTopologyWithOpts this name (KillOptions.)))
 
       (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
-        (mark! nimbus:num-killTopologyWithOpts-calls)
+        (.mark nimbus:num-killTopologyWithOpts-calls)
         (check-storm-active! nimbus storm-name true)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
               storm-id (topology-conf STORM-ID)
@@ -1603,7 +1602,7 @@
             nimbus topology-conf)))
 
       (^void rebalance [this ^String storm-name ^RebalanceOptions options]
-        (mark! nimbus:num-rebalance-calls)
+        (.mark nimbus:num-rebalance-calls)
         (check-storm-active! nimbus storm-name true)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
               operation "rebalance"]
@@ -1624,7 +1623,7 @@
             (notify-topology-action-listener nimbus storm-name operation))))
 
       (activate [this storm-name]
-        (mark! nimbus:num-activate-calls)
+        (.mark nimbus:num-activate-calls)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
               operation "activate"]
           (check-authorization! nimbus storm-name topology-conf operation)
@@ -1632,7 +1631,7 @@
           (notify-topology-action-listener nimbus storm-name operation)))
 
       (deactivate [this storm-name]
-        (mark! nimbus:num-deactivate-calls)
+        (.mark nimbus:num-deactivate-calls)
         (let [topology-conf (try-read-storm-conf-from-name conf storm-name nimbus)
               operation "deactivate"]
           (check-authorization! nimbus storm-name topology-conf operation)
@@ -1640,7 +1639,7 @@
           (notify-topology-action-listener nimbus storm-name operation)))
 
       (debug [this storm-name component-id enable? samplingPct]
-        (mark! nimbus:num-debug-calls)
+        (.mark nimbus:num-debug-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id blob-store)
@@ -1661,7 +1660,7 @@
 
       (^void setWorkerProfiler
         [this ^String id ^ProfileRequest profileRequest]
-        (mark! nimbus:num-setWorkerProfiler-calls)
+        (.mark nimbus:num-setWorkerProfiler-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler")
@@ -1670,7 +1669,7 @@
 
       (^List getComponentPendingProfileActions
         [this ^String id ^String component_id ^ProfileAction action]
-        (mark! nimbus:num-getComponentPendingProfileActions-calls)
+        (.mark nimbus:num-getComponentPendingProfileActions-calls)
         (let [info (get-common-topo-info id "getComponentPendingProfileActions")
               storm-cluster-state (:storm-cluster-state info)
               task->component (:task->component info)
@@ -1693,7 +1692,7 @@
           latest-profile-actions))
 
       (^void setLogConfig [this ^String id ^LogConfig log-config-msg]
-        (mark! nimbus:num-setLogConfig-calls)
+        (.mark nimbus:num-setLogConfig-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "setLogConfig")
@@ -1719,7 +1718,7 @@
             (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
 
       (uploadNewCredentials [this storm-name credentials]
-        (mark! nimbus:num-uploadNewCredentials-calls)
+        (.mark nimbus:num-uploadNewCredentials-calls)
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               storm-id (get-storm-id storm-cluster-state storm-name)
               topology-conf (try-read-storm-conf conf storm-id blob-store)
@@ -1728,7 +1727,7 @@
           (locking (:cred-update-lock nimbus) (.setCredentials storm-cluster-state storm-id
(thriftify-credentials creds) topology-conf))))
 
       (beginFileUpload [this]
-        (mark! nimbus:num-beginFileUpload-calls)
+        (.mark nimbus:num-beginFileUpload-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [fileloc (str (inbox nimbus) "/stormjar-" (Utils/uuid) ".jar")]
           (.put (:uploaders nimbus)
@@ -1739,7 +1738,7 @@
           ))
 
       (^void uploadChunk [this ^String location ^ByteBuffer chunk]
-        (mark! nimbus:num-uploadChunk-calls)
+        (.mark nimbus:num-uploadChunk-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [uploaders (:uploaders nimbus)
               ^WritableByteChannel channel (.get uploaders location)]
@@ -1751,7 +1750,7 @@
           ))
 
       (^void finishFileUpload [this ^String location]
-        (mark! nimbus:num-finishFileUpload-calls)
+        (.mark nimbus:num-finishFileUpload-calls)
         (check-authorization! nimbus nil nil "fileUpload")
         (let [uploaders (:uploaders nimbus)
               ^WritableByteChannel channel (.get uploaders location)]
@@ -1765,7 +1764,7 @@
 
       (^String beginFileDownload
         [this ^String file]
-        (mark! nimbus:num-beginFileDownload-calls)
+        (.mark nimbus:num-beginFileDownload-calls)
         (check-authorization! nimbus nil nil "fileDownload")
         (let [is (BufferInputStream. (.getBlob (:blob-store nimbus) file nil) 
               ^Integer (Utils/getInt (conf STORM-BLOBSTORE-INPUTSTREAM-BUFFER-SIZE-BYTES)

@@ -1775,7 +1774,7 @@
           id))
 
       (^ByteBuffer downloadChunk [this ^String id]
-        (mark! nimbus:num-downloadChunk-calls)
+        (.mark nimbus:num-downloadChunk-calls)
         (check-authorization! nimbus nil nil "fileDownload")
         (let [downloaders (:downloaders nimbus)
               ^BufferFileInputStream is (.get downloaders id)]
@@ -1790,12 +1789,12 @@
             )))
 
       (^String getNimbusConf [this]
-        (mark! nimbus:num-getNimbusConf-calls)
+        (.mark nimbus:num-getNimbusConf-calls)
         (check-authorization! nimbus nil nil "getNimbusConf")
         (JSONValue/toJSONString (:conf nimbus)))
 
       (^LogConfig getLogConfig [this ^String id]
-        (mark! nimbus:num-getLogConfig-calls)
+        (.mark nimbus:num-getLogConfig-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf "getLogConfig")
@@ -1804,28 +1803,28 @@
            (if log-config log-config (LogConfig.))))
 
       (^String getTopologyConf [this ^String id]
-        (mark! nimbus:num-getTopologyConf-calls)
+        (.mark nimbus:num-getTopologyConf-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopologyConf")
               (JSONValue/toJSONString topology-conf)))
 
       (^StormTopology getTopology [this ^String id]
-        (mark! nimbus:num-getTopology-calls)
+        (.mark nimbus:num-getTopology-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getTopology")
               (system-topology! topology-conf (try-read-storm-topology id (:blob-store nimbus)))))
 
       (^StormTopology getUserTopology [this ^String id]
-        (mark! nimbus:num-getUserTopology-calls)
+        (.mark nimbus:num-getUserTopology-calls)
         (let [topology-conf (try-read-storm-conf conf id (:blob-store nimbus))
               storm-name (topology-conf TOPOLOGY-NAME)]
               (check-authorization! nimbus storm-name topology-conf "getUserTopology")
               (try-read-storm-topology id blob-store)))
 
       (^ClusterSummary getClusterInfo [this]
-        (mark! nimbus:num-getClusterInfo-calls)
+        (.mark nimbus:num-getClusterInfo-calls)
         (check-authorization! nimbus nil nil "getClusterInfo")
         (let [storm-cluster-state (:storm-cluster-state nimbus)
               supervisor-infos (all-supervisor-info storm-cluster-state)
@@ -1892,7 +1891,7 @@
               ret))
 
       (^TopologyInfo getTopologyInfoWithOpts [this ^String storm-id ^GetInfoOptions options]
-        (mark! nimbus:num-getTopologyInfoWithOpts-calls)
+        (.mark nimbus:num-getTopologyInfoWithOpts-calls)
         (let [{:keys [storm-name
                       storm-cluster-state
                       all-components
@@ -1955,7 +1954,7 @@
           topo-info))
 
       (^TopologyInfo getTopologyInfo [this ^String topology-id]
-        (mark! nimbus:num-getTopologyInfo-calls)
+        (.mark nimbus:num-getTopologyInfo-calls)
         (.getTopologyInfoWithOpts this
                                   topology-id
                                   (doto (GetInfoOptions.) (.set_num_err_choice NumErrorsChoice/ALL))))
@@ -2110,7 +2109,7 @@
 
       (^TopologyPageInfo getTopologyPageInfo
         [this ^String topo-id ^String window ^boolean include-sys?]
-        (mark! nimbus:num-getTopologyPageInfo-calls)
+        (.mark nimbus:num-getTopologyPageInfo-calls)
         (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
 
               exec->node+port (:executor->node+port (:assignment info))
@@ -2158,7 +2157,7 @@
          ^String component-id
          ^String window
          ^boolean include-sys?]
-        (mark! nimbus:num-getComponentPageInfo-calls)
+        (.mark nimbus:num-getComponentPageInfo-calls)
         (let [info (get-common-topo-info topo-id "getComponentPageInfo")
               {:keys [executor->node+port node->host]} (:assignment info)
               ;TODO: when translating this function, you should replace the map-val with
a proper for loop HERE
@@ -2219,7 +2218,7 @@
 
       Shutdownable
       (shutdown [this]
-        (mark! nimbus:num-shutdown-calls)
+        (.mark nimbus:num-shutdown-calls)
         (log-message "Shutting down master")
         (.close (:timer nimbus))
         (.disconnect (:storm-cluster-state nimbus))
@@ -2301,3 +2300,4 @@
 (defn -main []
   (Utils/setupDefaultUncaughtExceptionHandler)
   (-launch (standalone-nimbus)))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a23533ca/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
new file mode 100644
index 0000000..eef69d0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import clojure.lang.IFn;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class StormMetricsRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
+    private static final MetricRegistry metrics = new MetricRegistry();
+
+    public static Meter registerMeter(String name) {
+        Meter meter = new Meter();
+        return register(name, meter);
+    }
+
+    // TODO: should replace fn to Gauge<Integer> when nimbus.clj is translated to java
+    public static Gauge<Integer> registerGauge(final String name, final IFn fn) {
+        Gauge<Integer> gauge = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                try {
+                    return (Integer) fn.call();
+                } catch (Exception e) {
+                    LOG.error("Error getting gauge value for {}", name, e);
+                }
+                return 0;
+            }
+        };
+        return register(name, gauge);
+    }
+
+    private static <T extends Metric> T register(String name, T metric) {
+        T ret;
+        try {
+            ret = metrics.register(name, metric);
+        } catch (IllegalArgumentException e) {
+            // swallow IllegalArgumentException when the metric exists already
+            ret = (T) metrics.getMetrics().get(name);
+            if (ret == null) {
+                throw e;
+            } else {
+                LOG.warn("Metric {} has already been registered", name);
+            }
+        }
+        return ret;
+    }
+}
\ No newline at end of file


Mime
View raw message