storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/2] storm git commit: STORM-3150: Improve Gauge registration methods and refactored code
Date Thu, 02 Aug 2018 17:40:07 GMT
Repository: storm
Updated Branches:
  refs/heads/master 146beff6a -> 5ef7c1d35


STORM-3150: Improve Gauge registration methods and refactored code

STORM-3150: Simplify gauge registration method.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d43947d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d43947d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d43947d

Branch: refs/heads/master
Commit: 4d43947dc87d490efbcdd446a6c45f79f7d9e6ea
Parents: 146beff
Author: Zhengdai Hu <hu.zhengdai@gmail.com>
Authored: Thu Jul 12 11:31:00 2018 -0500
Committer: Zhengdai Hu <hu.zhengdai@gmail.com>
Committed: Thu Aug 2 09:56:59 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 37 +++++++++--------
 .../storm/daemon/supervisor/Supervisor.java     | 13 +-----
 .../storm/metric/StormMetricsRegistry.java      | 42 +++++---------------
 .../metricstore/rocksdb/MetricsCleaner.java     |  9 +----
 .../org/apache/storm/pacemaker/Pacemaker.java   | 15 ++-----
 .../scheduler/blacklist/BlacklistScheduler.java | 10 +----
 6 files changed, 39 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5cd7d97..c401f60 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1936,13 +1936,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     private double fragmentedMemory() {
-        Double res = nodeIdToResources.get().values().parallelStream().filter(x -> isFragmented(x)
== true)
+        Double res = nodeIdToResources.get().values().parallelStream().filter(this::isFragmented)
                                       .mapToDouble(SupervisorResources::getAvailableMem).filter(x
-> x > 0).sum();
         return res.intValue();
     }
 
     private int fragmentedCpu() {
-        Double res = nodeIdToResources.get().values().parallelStream().filter(x -> isFragmented(x)
== true)
+        Double res = nodeIdToResources.get().values().parallelStream().filter(this::isFragmented)
                                       .mapToDouble(SupervisorResources::getAvailableCpu).filter(x
-> x > 0).sum();
         return res.intValue();
     }
@@ -2808,20 +2808,25 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                                     });
 
             StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size());
-            StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", () -> fragmentedMemory());
-            StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", () -> fragmentedCpu());
-            StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values().parallelStream()
-                                                                                        
        .mapToDouble(
-                                                                                        
            SupervisorResources::getAvailableMem)
-                                                                                        
        .sum());
-            StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values().parallelStream().mapToDouble(
-                SupervisorResources::getAvailableCpu).sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values().parallelStream()
-                                                                                        
    .mapToDouble(SupervisorResources::getTotalMem)
-                                                                                        
    .sum());
-            StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values().parallelStream()
-                                                                                        
 .mapToDouble(SupervisorResources::getTotalCpu)
-                                                                                        
 .sum());
+            StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory);
+            StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu);
+            StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(SupervisorResources::getAvailableMem)
+                .sum());
+            StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(SupervisorResources::getAvailableCpu)
+                .sum());
+            StormMetricsRegistry.registerGauge("nimbus:total-memory", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(SupervisorResources::getTotalMem)
+                .sum());
+            StormMetricsRegistry.registerGauge("nimbus:total-cpu", () -> nodeIdToResources.get().values()
+                .parallelStream()
+                .mapToDouble(SupervisorResources::getTotalCpu)
+                .sum());
+
             StormMetricsRegistry.startMetricsReporters(conf);
 
             if (clusterConsumerExceutors != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index b8bb4aa..8081290 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -27,7 +27,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
@@ -312,7 +311,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             launch();
             Utils.addShutdownHookWithForceKillIn1Sec(this::close);
 
-            registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
+            StormMetricsRegistry.registerGauge("supervisor:num-slots-used-gauge", () ->
SupervisorUtils.supervisorWorkerIds(conf).size());
             StormMetricsRegistry.startMetricsReporters(conf);
 
             // blocking call under the hood, must invoke after launch cause some services
must be initialized
@@ -443,16 +442,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         eventManager.add(syn);
     }
 
-    private void registerWorkerNumGauge(String name, final Map<String, Object> conf)
{
-        StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
-            @Override
-            public Integer call() throws Exception {
-                Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf);
-                return pids.size();
-            }
-        });
-    }
-
     @Override
     public void close() {
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index 3bc50e7..2ba832b 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -19,7 +19,7 @@ import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Reservoir;
 import java.util.Map;
-import java.util.concurrent.Callable;
+
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
@@ -27,52 +27,30 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("unchecked")
 public class StormMetricsRegistry {
-    public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry();
+    private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry();
     private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class);
 
-    public static Meter registerMeter(String name) {
-        Meter meter = new Meter();
-        return register(name, meter);
+    public static Meter registerMeter(final String name) {
+        return register(name, new Meter());
     }
 
-    // TODO: should replace Callable to Gauge<Integer> when nimbus.clj is translated
to java
-    public static Gauge<Integer> registerGauge(final String name, final Callable fn)
{
-        Gauge<Integer> gauge = new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                try {
-                    return (Integer) fn.call();
-                } catch (Exception e) {
-                    LOG.error("Error getting gauge value for {}", name, e);
-                }
-                return 0;
-            }
-        };
+    public static <V> Gauge<V> registerGauge(final String name, final Gauge<V>
gauge) {
         return register(name, gauge);
     }
 
-    public static void registerProvidedGauge(final String name, Gauge gauge) {
-        register(name, gauge);
-    }
-
     public static Histogram registerHistogram(String name, Reservoir reservoir) {
-        Histogram histogram = new Histogram(reservoir);
-        return register(name, histogram);
+        return register(name, new Histogram(reservoir));
     }
 
     public static void startMetricsReporters(Map<String, Object> topoConf) {
         for (PreparableReporter reporter : MetricsUtils.getPreparableReporters(topoConf))
{
-            startMetricsReporter(reporter, topoConf);
+            reporter.prepare(StormMetricsRegistry.DEFAULT_REGISTRY, topoConf);
+            reporter.start();
+            LOG.info("Started statistics report plugin...");
         }
     }
 
-    private static void startMetricsReporter(PreparableReporter reporter, Map<String,
Object> topoConf) {
-        reporter.prepare(StormMetricsRegistry.DEFAULT_REGISTRY, topoConf);
-        reporter.start();
-        LOG.info("Started statistics report plugin...");
-    }
-
-    private static <T extends Metric> T register(String name, T metric) {
+    private static <T extends Metric> T register(final String name, T metric) {
         T ret;
         try {
             ret = DEFAULT_REGISTRY.register(name, metric);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
index 07fe862..900f8b9 100644
--- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
@@ -11,7 +11,6 @@
 
 package org.apache.storm.metricstore.rocksdb;
 
-import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metricstore.FilterOptions;
@@ -40,13 +39,7 @@ public class MetricsCleaner implements Runnable, AutoCloseable {
         }
         this.failureMeter = failureMeter;
 
-        Gauge<Long> gauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return purgeTimestamp;
-            }
-        };
-        StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge);
+        StormMetricsRegistry.registerGauge("MetricsCleaner:purgeTimestamp", () -> purgeTimestamp);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
index 66e14fa..3efb47c 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
@@ -19,7 +19,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBMessageData;
@@ -44,20 +43,14 @@ public class Pacemaker implements IServerMessageHandler {
     private final static Meter meterTotalSentSize = StormMetricsRegistry.registerMeter("pacemaker:total-sent-size");
     private final static Histogram histogramHeartbeatSize =
         StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir());
-    private Map<String, byte[]> heartbeats;
-    private Map<String, Object> conf;
+    private final Map<String, byte[]> heartbeats;
+    private final Map<String, Object> conf;
 
 
     public Pacemaker(Map<String, Object> conf) {
-        heartbeats = new ConcurrentHashMap();
+        heartbeats = new ConcurrentHashMap<>();
         this.conf = conf;
-        StormMetricsRegistry.registerGauge("pacemaker:size-total-keys",
-                                           new Callable() {
-                                               @Override
-                                               public Integer call() throws Exception {
-                                                   return heartbeats.size();
-                                               }
-                                           });
+        StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
         StormMetricsRegistry.startMetricsReporters(conf);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4d43947d/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index 9f1a123..5034f42 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -17,7 +17,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -89,13 +88,8 @@ public class BlacklistScheduler implements IScheduler {
         cachedSupervisors = new HashMap<>();
         blacklistHost = new HashSet<>();
 
-        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new Callable<Integer>()
{
-            @Override
-            public Integer call() throws Exception {
-                //nimbus:num-blacklisted-supervisor + none blacklisted supervisor = nimbus:num-supervisors
-                return blacklistHost.size();
-            }
-        });
+        //nimbus:num-blacklisted-supervisor + non-blacklisted supervisor = nimbus:num-supervisors
+        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () ->
blacklistHost.size());
     }
 
     @Override


Mime
View raw message