beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [2/4] beam git commit: jstorm-runner: support distribution metrics, this is required by nexmark.
Date Fri, 08 Sep 2017 06:43:54 GMT
jstorm-runner: support distribution metrics, this is required by nexmark.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c388447
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c388447
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c388447

Branch: refs/heads/jstorm-runner
Commit: 0c388447813a93ab9afc45af591417a82f8c4b1b
Parents: 56ad7a8
Author: Pei He <pei@apache.org>
Authored: Wed Sep 6 11:05:05 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Fri Sep 8 14:42:28 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/TestJStormRunner.java   |  1 +
 .../jstorm/translation/JStormMetricResults.java | 38 +++++++++++--
 .../jstorm/translation/MetricsReporter.java     | 56 ++++++++++++++++++++
 3 files changed, 92 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b28c127..e9f6337 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -61,6 +61,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult>
{
   private TestJStormRunner(JStormPipelineOptions options) {
     this.options = options;
     Map conf = Maps.newHashMap();
+    conf.put("topology.metric.sample.rate", 1);
     // Default state backend is RocksDB, for the users who could not run RocksDB on local
testing
     // env, following config is used to configure state backend to memory.
     // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());

http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
index 986bf0c..01d4441 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
@@ -22,7 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.alibaba.jstorm.common.metric.AsmCounter;
 import com.alibaba.jstorm.common.metric.AsmGauge;
 import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot;
 import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metrics.Snapshot;
 import com.google.auto.value.AutoValue;
 import java.util.ArrayList;
 import java.util.List;
@@ -87,7 +89,35 @@ public class JStormMetricResults extends MetricResults {
                   new Instant(0))));
     }
 
-    return JStormMetricQueryResults.create(counters, gauges);
+    List<MetricResult<DistributionResult>> distributions = new ArrayList<>();
+    for (Map.Entry<String, AsmHistogram> entry : histogramMap.entrySet()) {
+      MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey());
+      if (!MetricFiltering.matches(filter, metricKey)) {
+        continue;
+      }
+      AsmHistogram histogram = entry.getValue();
+      histogram.forceFlush();
+
+      Snapshot snapshot =
+          ((AsmHistogramSnapshot) histogram.getSnapshots().get(AsmWindow.M10_WINDOW)).getSnapshot();
+      // TODO: Sum and count might be under estimated, because JStorm histogram only store
a fixed
+      // number of values.
+      long sum = 0;
+      for (long v : snapshot.getValues()) {
+        sum += v;
+      }
+      distributions.add(
+          JStormMetricResult.create(
+              metricKey.metricName(),
+              metricKey.stepName(),
+              DistributionResult.create(
+                  sum,
+                  snapshot.size(),
+                  snapshot.getMin(),
+                  snapshot.getMax())));
+    }
+
+    return JStormMetricQueryResults.create(counters, gauges, distributions);
   }
 
   @AutoValue
@@ -97,8 +127,10 @@ public class JStormMetricResults extends MetricResults {
 
     public static MetricQueryResults create(
         Iterable<MetricResult<Long>> counters,
-        Iterable<MetricResult<GaugeResult>> gauges) {
-      return new AutoValue_JStormMetricResults_JStormMetricQueryResults(counters, gauges,
null);
+        Iterable<MetricResult<GaugeResult>> gauges,
+        Iterable<MetricResult<DistributionResult>> distributions) {
+      return new AutoValue_JStormMetricResults_JStormMetricQueryResults(
+          counters, gauges, distributions);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
index 1e38d1c..7867a83 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -21,12 +21,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.metric.MetricClient;
 import com.alibaba.jstorm.metrics.Gauge;
 import com.google.common.collect.Maps;
 import java.util.Map;
 import org.apache.beam.runners.core.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -45,6 +47,7 @@ class MetricsReporter {
 
   private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
   private final Map<String, Long> reportedCounters = Maps.newHashMap();
+  private final Map<String, DistributionResult> reportedDistributions = Maps.newHashMap();
   private final MetricClient metricClient;
 
   public static MetricsReporter create(MetricClient metricClient) {
@@ -77,6 +80,7 @@ class MetricsReporter {
         metricResults.queryMetrics(MetricsFilter.builder().build());
     updateCounters(metricQueryResults.counters());
     updateGauges(metricQueryResults.gauges());
+    updateDistributions(metricQueryResults.distributions());
   }
 
   private void updateCounters(Iterable<MetricResult<Long>> counters) {
@@ -105,6 +109,58 @@ class MetricsReporter {
     }
   }
 
+  private void updateDistributions(Iterable<MetricResult<DistributionResult>>
distributions) {
+    for (final MetricResult<DistributionResult> distributionResult : distributions)
{
+      String metricName = getMetricNameString(COUNTER_PREFIX, distributionResult);
+      AsmHistogram histogram = metricClient.registerHistogram(metricName);
+      DistributionResult distribution = distributionResult.attempted();
+      if (distribution.count() == 0) {
+        return;
+      }
+      DistributionResult oldDistribution = reportedDistributions.get(metricName);
+      reportedDistributions.put(metricName, distribution);
+      Long newMin;
+      Long newMax;
+      long restCount;
+      long restSum;
+      if (oldDistribution == null) {
+        newMin = distribution.min();
+        newMax = distribution.min() != distribution.max() ? distribution.max() : null;
+        restCount = distribution.count();
+        restSum = distribution.sum();
+      } else {
+        newMin = distribution.min() < oldDistribution.min() ? distribution.min() : null;
+        newMax =
+            (distribution.max() > oldDistribution.max() && distribution.min()
!= distribution.max())
+                ? distribution.max() : null;
+        restCount = distribution.count() - oldDistribution.count();
+        restSum = distribution.sum() - oldDistribution.sum();
+      }
+      if (newMin != null) {
+        histogram.update(newMin);
+        restCount--;
+        restSum -= newMin;
+      }
+      if (newMax != null) {
+        histogram.update(newMax);
+        restCount--;
+        restSum -= newMax;
+      }
+      if (restCount == 0) {
+        return;
+      }
+      long restAvg = restSum / restCount;
+      long restMod = restSum % restCount;
+      for (long i = 0; i < restCount; ++i) {
+        if (i == 0) {
+          histogram.update(restAvg + restMod);
+        } else {
+          histogram.update(restAvg);
+        }
+      }
+    }
+  }
+
   private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
     return prefix
         + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.step()


Mime
View raw message