storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [14/18] storm git commit: Added in a skipped max spout penting metric
Date Thu, 07 Sep 2017 19:14:55 GMT
Added in a skipped max spout penting metric


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96e68c02
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96e68c02
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96e68c02

Branch: refs/heads/master
Commit: 96e68c02453eb1884785db1b32e387782f576a29
Parents: 2f5cde8
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Fri Sep 1 09:47:40 2017 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Fri Sep 1 09:47:40 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                    |  3 ++-
 .../org/apache/storm/loadgen/LoadMetricsServer.java | 16 ++++++++++++++--
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96e68c02/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 52f5358..6c476bd 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -118,7 +118,7 @@ There are a lot of different metrics supported
 |end_time| The ending time of the metrics window from the the first topology was launched.
| all
 |time_window| the length in seconds for the time window. | all
 |ids| The topology ids that are being tracked | all
-|congested| Componets that appear to be congested | all
+|congested| Components that appear to be congested | all
 |storm_version| The version of storm as reported by the client | all
 |java_version| The version of java as reported by the client | all
 |os_arch| The OS architecture as reported by the client | all
@@ -128,6 +128,7 @@ There are a lot of different metrics supported
 |hosts| The number of hosts the monitored topologies are running on| all
 |executors| The number of running executors in the monitored topologies | all
 |workers| The number of workers the monitored topologies are running on | all
+|skipped\_max\_spout| The number of ms in total that the spout reported it skipped trying
to emit because of `topology.max.spout.pending`. This is the sum for all spouts and can be
used to decide if setting the value higher will likely improve throughput. `congested` reports
individual spouts that appear to be slowed down by this to a large degree. | all
 |target_rate| The target rate in sentences per second for the ThroughputVsLatency topology
| ThroughputVsLatency
 |spout_parallel| The parallelism of the spout for the `ThroughputVsLatency` topology. | ThroughputVsLatency
 |split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology.
| ThroughputVsLatency

http://git-wip-us.apache.org/repos/asf/storm/blob/96e68c02/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 31bea1f..e6c1616 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -91,6 +91,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
 
     public static class Measurements {
         private final Histogram histo;
+        private long skippedMaxSpoutMs;
         private double userMs;
         private double sysMs;
         private double gcMs;
@@ -114,7 +115,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
          */
         public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram
histo,
                             double userMs, double sysMs, double gcMs, long memBytes, Set<String>
topologyIds,
-                            long workers, long executors, long hosts, Map<String, String>
congested) {
+                            long workers, long executors, long hosts, Map<String, String>
congested, long skippedMaxSpoutMs) {
             this.uptimeSecs = uptimeSecs;
             this.acked = acked;
             this.timeWindow = timeWindow;
@@ -129,6 +130,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             this.executors = executors;
             this.hosts = hosts;
             this.congested = congested;
+            this.skippedMaxSpoutMs = skippedMaxSpoutMs;
         }
 
         /**
@@ -149,6 +151,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             executors = 0;
             hosts = 0;
             congested = new HashMap<>();
+            skippedMaxSpoutMs = 0;
         }
 
         /**
@@ -170,6 +173,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             executors = Math.max(executors, other.executors);
             hosts = Math.max(hosts, other.hosts);
             congested.putAll(other.congested);
+            skippedMaxSpoutMs += other.skippedMaxSpoutMs;
         }
 
         public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -204,6 +208,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             return convert(gcMs, TimeUnit.MILLISECONDS, unit);
         }
 
+        public double getSkippedMaxSpout(TimeUnit unit) {
+            return convert(skippedMaxSpoutMs, TimeUnit.MILLISECONDS, unit);
+        }
+
         public double getMemMb() {
             return memBytes / (1024.0 * 1024.0);
         }
@@ -397,6 +405,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         tmp.put("user_cpu", new MetricExtractor((m, unit) -> m.getUserTime(unit)));
         tmp.put("sys_cpu", new MetricExtractor((m, unit) -> m.getSysTime(unit)));
         tmp.put("gc_cpu", new MetricExtractor((m, unit) -> m.getGc(unit)));
+        tmp.put("skipped_max_spout", new MetricExtractor((m, unit) -> m.getSkippedMaxSpout(unit)));
         tmp.put("acked",  new MetricExtractor((m, unit) -> m.getAcked(), ""));
         tmp.put("acked_rate",  new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
         tmp.put("completed",  new MetricExtractor((m, unit) -> m.getCompleted(), ""));
@@ -730,6 +739,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
     private final AtomicLong userCpu = new AtomicLong(0);
     private final AtomicLong gcCount = new AtomicLong(0);
     private final AtomicLong gcMs = new AtomicLong(0);
+    private final AtomicLong skippedMaxSpoutMs = new AtomicLong(0);
     private final ConcurrentHashMap<String, MemMeasure> memoryBytes = new ConcurrentHashMap<>();
     private final AtomicReference<ConcurrentHashMap<String, String>> congested
= new AtomicReference<>(new ConcurrentHashMap<>());
     private final List<MetricResultsReporter> reporters;
@@ -909,10 +919,11 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         long user = userCpu.getAndSet(0);
         long sys = systemCpu.getAndSet(0);
         long gc = gcMs.getAndSet(0);
+        long skippedMaxSpout = skippedMaxSpoutMs.getAndSet(0);
         long memBytes = readMemory();
 
         allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime,
copy, user, sys, gc, memBytes,
-            ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>())));
+            ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()),
skippedMaxSpout));
         Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
         for (MetricResultsReporter reporter: reporters) {
             reporter.reportWindow(inWindow, allCombined);
@@ -974,6 +985,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                 }
             } else if (dp.name.equals("__skipped-max-spout-ms")) {
                 if (dp.value instanceof Number) {
+                    skippedMaxSpoutMs.getAndAdd(((Number) dp.value).longValue());
                     double full = ((Number) dp.value).doubleValue() / 10_000.0; //The frequency
of reporting
                     if (full >= 0.8) {
                         congested.get().put(


Mime
View raw message