beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/4] beam git commit: jstorm-runner: saves MetricResults in local JStormRunnerResult after cancelling.
Date Fri, 08 Sep 2017 06:43:53 GMT
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner d2b285122 -> ef70031b7


jstorm-runner: saves MetricResults in local JStormRunnerResult after cancelling.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56ad7a85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56ad7a85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56ad7a85

Branch: refs/heads/jstorm-runner
Commit: 56ad7a852acf66ebd1061e276317481629b270c4
Parents: d2b2851
Author: Pei He <pei@apache.org>
Authored: Wed Sep 6 11:05:51 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Fri Sep 8 14:42:27 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunnerResult.java | 38 +++++++++++++++++++-
 .../beam/runners/jstorm/TestJStormRunner.java   | 18 ----------
 .../jstorm/translation/JStormMetricResults.java | 25 +++++++++----
 3 files changed, 56 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index 3962ca2..8848717 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -23,10 +23,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.metric.AsmMetricRegistry;
 import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
 import com.alibaba.jstorm.utils.JStormUtils;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.CommonInstance;
 import org.apache.beam.runners.jstorm.translation.JStormMetricResults;
@@ -62,6 +65,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
     this.topologyName = checkNotNull(topologyName, "topologyName");
   }
 
+  @Override
   public State getState() {
     return null;
   }
@@ -79,6 +83,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
     private final LocalCluster localCluster;
     private final long localModeExecuteTimeSecs;
     private boolean cancelled;
+    private MetricResults savedMetricResults;
 
     LocalJStormPipelineResult(
         String topologyName,
@@ -89,12 +94,27 @@ public abstract class JStormRunnerResult implements PipelineResult {
       this.localCluster = checkNotNull(localCluster, "localCluster");
       this.localModeExecuteTimeSecs = localModeExecuteTimeSecs;
       this.cancelled = false;
+      this.savedMetricResults = null;
+    }
+
+    @Override
+    public State getState() {
+      if (cancelled) {
+        return State.CANCELLED;
+      } else if (globalWatermark() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        return State.DONE;
+      } else {
+        return State.RUNNING;
+      }
     }
 
     @Override
     public State cancel() throws IOException {
+      savedMetricResults = metrics();
       localCluster.killTopology(getTopologyName());
       localCluster.shutdown();
+      clearPAssertCount();
+      TaskReportErrorAndDie.setExceptionRecord(null);
       JStormUtils.sleepMs(1000);
       cancelled = true;
       return State.CANCELLED;
@@ -129,7 +149,12 @@ public abstract class JStormRunnerResult implements PipelineResult {
 
     @Override
     public MetricResults metrics() {
-      return new JStormMetricResults();
+      if (savedMetricResults != null) {
+        return savedMetricResults;
+      }
+      AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics();
+      return new JStormMetricResults(
+          metricRegistry.getCounters(), metricRegistry.getGauges(), metricRegistry.getHistograms());
     }
 
     private long globalWatermark() {
@@ -151,5 +176,16 @@ public abstract class JStormRunnerResult implements PipelineResult {
         return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
       }
     }
+
+    private void clearPAssertCount() {
+      AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
+      Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
+      while (itr.hasNext()) {
+        Map.Entry<String, AsmMetric> metric = itr.next();
+        if (metric.getKey().contains(getTopologyName())) {
+          itr.remove();
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b637b7c..b28c127 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -19,15 +19,11 @@ package org.apache.beam.runners.jstorm;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.alibaba.jstorm.common.metric.AsmMetric;
-import com.alibaba.jstorm.metric.AsmMetricRegistry;
-import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
@@ -111,9 +107,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult>
{
         throw new AssertionError("Assertion checks timed out.");
       }
     } finally {
-      clearPAssertCount();
       cancel(result);
-      TaskReportErrorAndDie.setExceptionRecord(null);
     }
   }
 
@@ -188,18 +182,6 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult>
{
     }
   }
 
-  private void clearPAssertCount() {
-    String topologyName = options.getJobName();
-    AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
-    Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
-    while (itr.hasNext()) {
-      Map.Entry<String, AsmMetric> metric = itr.next();
-      if (metric.getKey().contains(topologyName)) {
-        itr.remove();
-      }
-    }
-  }
-
   private void cancel(JStormRunnerResult result) {
     try {
       result.cancel();

http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
index dbaa28e..986bf0c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.alibaba.jstorm.common.metric.AsmCounter;
 import com.alibaba.jstorm.common.metric.AsmGauge;
-import com.alibaba.jstorm.metric.AsmMetricRegistry;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.metric.AsmWindow;
-import com.alibaba.jstorm.metric.JStormMetrics;
 import com.google.auto.value.AutoValue;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,12 +43,24 @@ import org.joda.time.Instant;
  * Implementation of {@link MetricResults} for the JStorm Runner.
  */
 public class JStormMetricResults extends MetricResults {
+
+  private final Map<String, AsmCounter> counterMap;
+  private final Map<String, AsmGauge> gaugeMap;
+  private final Map<String, AsmHistogram> histogramMap;
+
+  public JStormMetricResults(
+      Map<String, AsmCounter> counterMap,
+      Map<String, AsmGauge> gaugeMap,
+      Map<String, AsmHistogram> histogramMap) {
+    this.counterMap = checkNotNull(counterMap, "counterMap");
+    this.gaugeMap = checkNotNull(gaugeMap, "gaugeMap");
+    this.histogramMap = checkNotNull(histogramMap, "histogramMap");
+  }
+
   @Override
   public MetricQueryResults queryMetrics(MetricsFilter filter) {
-    AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics();
-
     List<MetricResult<Long>> counters = new ArrayList<>();
-    for (Map.Entry<String, AsmCounter> entry : metricRegistry.getCounters().entrySet())
{
+    for (Map.Entry<String, AsmCounter> entry : counterMap.entrySet()) {
       MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey());
       if (!MetricFiltering.matches(filter, metricKey)) {
         continue;
@@ -60,7 +73,7 @@ public class JStormMetricResults extends MetricResults {
     }
 
     List<MetricResult<GaugeResult>> gauges = new ArrayList<>();
-    for (Map.Entry<String, AsmGauge> entry : metricRegistry.getGauges().entrySet())
{
+    for (Map.Entry<String, AsmGauge> entry : gaugeMap.entrySet()) {
       MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey());
       if (!MetricFiltering.matches(filter, metricKey)) {
         continue;


Mime
View raw message