beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aromane...@apache.org
Subject [beam] branch spark-runner_structured-streaming updated: Add metrics support in DoFn
Date Fri, 05 Jul 2019 14:53:14 GMT
This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 59cc5dd  Add metrics support in DoFn
59cc5dd is described below

commit 59cc5ddc008afd928cb98ecefe77d93f38f37a7d
Author: Alexey Romanenko <aromanenko.dev@gmail.com>
AuthorDate: Fri Jul 5 16:47:56 2019 +0200

    Add metrics support in DoFn
---
 .../SparkStructuredStreamingPipelineOptions.java   |   8 +-
 .../SparkStructuredStreamingPipelineResult.java    |   7 +-
 .../SparkStructuredStreamingRunner.java            |  60 ++++++++++-
 .../aggregators/AggregatorsAccumulator.java        |  70 +++++++++++++
 .../aggregators/NamedAggregators.java              | 112 +++++++++++++++++++++
 .../aggregators/NamedAggregatorsAccumulator.java   |  63 ++++++++++++
 .../package-info.java}                             |  11 +-
 .../AggregatorMetric.java}                         |  28 ++++--
 .../metrics/AggregatorMetricSource.java            |  49 +++++++++
 .../CompositeSource.java}                          |  34 +++++--
 .../metrics/MetricsAccumulator.java                |  71 +++++++++++++
 .../MetricsContainerStepMapAccumulator.java        |  65 ++++++++++++
 .../metrics/SparkBeamMetric.java                   |  90 +++++++++++++++++
 .../metrics/SparkBeamMetricSource.java             |  48 +++++++++
 .../SparkMetricsContainerStepMap.java}             |  27 +++--
 .../package-info.java}                             |  11 +-
 .../translation/batch/DoFnFunction.java            |  15 ++-
 .../translation/batch/DoFnRunnerWithMetrics.java   |  97 ++++++++++++++++++
 .../translation/batch/ParDoTranslatorBatch.java    |   8 ++
 19 files changed, 829 insertions(+), 45 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
index b115d9b..27743bd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
@@ -18,10 +18,16 @@
 package org.apache.beam.runners.spark.structuredstreaming;
 
 import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
  * master address, and other user-related knobs.
  */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {
+  @Description("Enable/disable sending aggregator values to Spark's metric sinks")
+  @Default.Boolean(true)
+  Boolean getEnableSparkMetricSinks();
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
index d0198d4..8ccb97a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java
@@ -19,10 +19,14 @@ package org.apache.beam.runners.spark.structuredstreaming;
 
 import java.io.IOException;
 import javax.annotation.Nullable;
+
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.joda.time.Duration;
 
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
 /** Represents a Spark pipeline execution result. */
 class SparkStructuredStreamingPipelineResult implements PipelineResult {
 
@@ -50,9 +54,8 @@ class SparkStructuredStreamingPipelineResult implements PipelineResult {
     return null;
   }
 
-  @Nullable // TODO: remove once method will be implemented
   @Override
   public MetricResults metrics() {
-    return null;
+    return asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
   }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
index c63efe4..29e4373 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java
@@ -19,15 +19,26 @@ package org.apache.beam.runners.spark.structuredstreaming;
 
 import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
+import org.apache.beam.runners.core.metrics.MetricsPusher;
+import org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
 import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.spark.SparkEnv$;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.metrics.MetricsSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,11 +125,29 @@ public final class SparkStructuredStreamingRunner
 
   @Override
   public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
+    MetricsEnvironment.setMetricsSupported(true);
+
+    // clear state of Aggregators, Metrics and Watermarks if exists.
+    AggregatorsAccumulator.clear();
+    MetricsAccumulator.clear();
+
     TranslationContext translationContext = translatePipeline(pipeline);
     // TODO initialise other services: checkpointing, metrics system, listeners, ...
     // TODO pass testMode using pipelineOptions
     translationContext.startPipeline(true);
-    return new SparkStructuredStreamingPipelineResult();
+
+    SparkStructuredStreamingPipelineResult result = new SparkStructuredStreamingPipelineResult();
+
+    if (options.getEnableSparkMetricSinks()) {
+      registerMetricsSource(options.getAppName());
+    }
+
+    MetricsPusher metricsPusher =
+        new MetricsPusher(
+            MetricsAccumulator.getInstance().value(), options.as(MetricsOptions.class), result);
+    metricsPusher.start();
+
+    return result;
   }
 
   private TranslationContext translatePipeline(Pipeline pipeline) {
@@ -129,7 +158,36 @@ public final class SparkStructuredStreamingRunner
         options.isStreaming()
             ? new PipelineTranslatorStreaming(options)
             : new PipelineTranslatorBatch(options);
+
+    final JavaSparkContext jsc =
+        JavaSparkContext.fromSparkContext(
+            pipelineTranslator.getTranslationContext().getSparkSession().sparkContext());
+    initAccumulators(options, jsc);
+
     pipelineTranslator.translate(pipeline);
     return pipelineTranslator.getTranslationContext();
   }
+
+  private void registerMetricsSource(String appName) {
+    final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
+    final AggregatorMetricSource aggregatorMetricSource =
+        new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
+    final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
+    final CompositeSource compositeSource =
+        new CompositeSource(
+            appName + ".Beam",
+            metricsSource.metricRegistry(),
+            aggregatorMetricSource.metricRegistry());
+    // re-register the metrics in case of context re-use
+    metricsSystem.removeSource(compositeSource);
+    metricsSystem.registerSource(compositeSource);
+  }
+
+  /** Init Metrics/Aggregators accumulators. This method is idempotent. */
+  public static void initAccumulators(
+      SparkStructuredStreamingPipelineOptions opts, JavaSparkContext jsc) {
+    // Init metrics accumulators
+    MetricsAccumulator.init(jsc);
+    AggregatorsAccumulator.init(jsc);
+  }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
new file mode 100644
index 0000000..2f31830
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/AggregatorsAccumulator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.aggregators;
+
+import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.AccumulatorV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For resilience, {@link AccumulatorV2 Accumulators} are required to be wrapped in a Singleton.
+ *
+ * @see <a
+ *     href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html">accumulatorsV2</a>
+ */
+public class AggregatorsAccumulator {
+  private static final Logger LOG = LoggerFactory.getLogger(AggregatorsAccumulator.class);
+
+  private static final String ACCUMULATOR_NAME = "Beam.Aggregators";
+
+  private static volatile NamedAggregatorsAccumulator instance = null;
+
+  /** Init aggregators accumulator if it has not been initiated. This method is idempotent. */
+  public static void init(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (AggregatorsAccumulator.class) {
+        if (instance == null) {
+          NamedAggregators namedAggregators = new NamedAggregators();
+          NamedAggregatorsAccumulator accumulator =
+              new NamedAggregatorsAccumulator(namedAggregators);
+          jsc.sc().register(accumulator, ACCUMULATOR_NAME);
+
+          instance = accumulator;
+        }
+      }
+      LOG.info("Instantiated aggregators accumulator: " + instance.value());
+    }
+  }
+
+  public static NamedAggregatorsAccumulator getInstance() {
+    if (instance == null) {
+      throw new IllegalStateException("Aggregrators accumulator has not been instantiated");
+    } else {
+      return instance;
+    }
+  }
+
+  @VisibleForTesting
+  public static void clear() {
+    synchronized (AggregatorsAccumulator.class) {
+      instance = null;
+    }
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregators.java
new file mode 100644
index 0000000..6fe3659
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregators.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.aggregators;
+
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class wraps a map of named aggregators. Spark expects that all accumulators be declared
+ * before a job is launched. Beam allows aggregators to be used and incremented on the fly. We
+ * create a map of named aggregators and instantiate in the the spark context before the job is
+ * launched. We can then add aggregators on the fly in Spark.
+ */
+public class NamedAggregators implements Serializable {
+  /** Map from aggregator name to current state. */
+  private final Map<String, State<?, ?, ?>> mNamedAggregators = new TreeMap<>();
+
+  /** Constructs a new NamedAggregators instance. */
+  public NamedAggregators() {}
+
+  /**
+   * @param name Name of aggregator to retrieve.
+   * @param typeClass Type class to cast the value to.
+   * @param <T> Type to be returned.
+   * @return the value of the aggregator associated with the specified name, or <code>null</code> if
+   *     the specified aggregator could not be found.
+   */
+  public <T> T getValue(String name, Class<T> typeClass) {
+    final State<?, ?, ?> state = mNamedAggregators.get(name);
+    return state != null ? typeClass.cast(state.render()) : null;
+  }
+
+  /** @return a map of all the aggregator names and their <b>rendered </b>values */
+  public Map<String, ?> renderAll() {
+    return ImmutableMap.copyOf(Maps.transformValues(mNamedAggregators, State::render));
+  }
+
+  /**
+   * Merges another NamedAggregators instance with this instance.
+   *
+   * @param other The other instance of named aggregators ot merge.
+   * @return This instance of Named aggregators with associated states updated to reflect the other
+   *     instance's aggregators.
+   */
+  public NamedAggregators merge(
+      NamedAggregators other) {
+    for (Map.Entry<String, State<?, ?, ?>> e : other.mNamedAggregators.entrySet()) {
+      String key = e.getKey();
+      State<?, ?, ?> otherValue = e.getValue();
+      mNamedAggregators.merge(key, otherValue, NamedAggregators::merge);
+    }
+    return this;
+  }
+
+  /**
+   * Helper method to merge States whose generic types aren't provably the same, so require some
+   * casting.
+   */
+  @SuppressWarnings("unchecked")
+  private static <InputT, InterT, OutputT> State<InputT, InterT, OutputT> merge(
+      State<?, ?, ?> s1, State<?, ?, ?> s2) {
+    return ((State<InputT, InterT, OutputT>) s1).merge((State<InputT, InterT, OutputT>) s2);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, State<?, ?, ?>> e : mNamedAggregators.entrySet()) {
+      sb.append(e.getKey()).append(": ").append(e.getValue().render()).append(" ");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * @param <InputT> Input data type
+   * @param <InterT> Intermediate data type (useful for averages)
+   * @param <OutputT> Output data type
+   */
+  public interface State<InputT, InterT, OutputT> extends Serializable {
+
+    /** @param element new element to update state */
+    void update(InputT element);
+
+    State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other);
+
+    InterT current();
+
+    OutputT render();
+
+    Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregatorsAccumulator.java
new file mode 100644
index 0000000..992e63f
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/NamedAggregatorsAccumulator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.aggregators;
+
+import org.apache.spark.util.AccumulatorV2;
+
+/** {@link AccumulatorV2} implementation for {@link NamedAggregators}. */
+public class NamedAggregatorsAccumulator extends AccumulatorV2<NamedAggregators, NamedAggregators> {
+  private static final NamedAggregators empty = new NamedAggregators();
+
+  private NamedAggregators value;
+
+  public NamedAggregatorsAccumulator(NamedAggregators value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean isZero() {
+    return value.equals(empty);
+  }
+
+  @Override
+  public NamedAggregatorsAccumulator copy() {
+    NamedAggregators newContainer = new NamedAggregators();
+    newContainer.merge(value);
+    return new NamedAggregatorsAccumulator(newContainer);
+  }
+
+  @Override
+  public void reset() {
+    this.value = new NamedAggregators();
+  }
+
+  @Override
+  public void add(NamedAggregators other) {
+    this.value.merge(other);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<NamedAggregators, NamedAggregators> other) {
+    this.value.merge(other.value());
+  }
+
+  @Override
+  public NamedAggregators value() {
+    return this.value;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/package-info.java
similarity index 65%
copy from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
copy to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/package-info.java
index b115d9b..11a87ee 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/package-info.java
@@ -15,13 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming;
 
-import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
- * master address, and other user-related knobs.
- */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+/** Provides internal utilities for implementing Beam aggregators using Spark accumulators. */
+package org.apache.beam.runners.spark.structuredstreaming.aggregators;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetric.java
similarity index 52%
copy from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
copy to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetric.java
index b115d9b..55590a6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetric.java
@@ -15,13 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming;
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
 
-import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
+import com.codahale.metrics.Metric;
+import org.apache.beam.runners.spark.structuredstreaming.aggregators.NamedAggregators;
 
-/**
- * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
- * master address, and other user-related knobs.
- */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+/** An adapter between the {@link NamedAggregators} and Codahale's {@link Metric} interface. */
+public class AggregatorMetric implements Metric {
+
+  private final NamedAggregators namedAggregators;
+
+  private AggregatorMetric(final NamedAggregators namedAggregators) {
+    this.namedAggregators = namedAggregators;
+  }
+
+  public static AggregatorMetric of(final NamedAggregators namedAggregators) {
+    return new AggregatorMetric(namedAggregators);
+  }
+
+  NamedAggregators getNamedAggregators() {
+    return namedAggregators;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetricSource.java
new file mode 100644
index 0000000..406dba3
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/AggregatorMetricSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.beam.runners.spark.structuredstreaming.aggregators.NamedAggregators;
+import org.apache.spark.metrics.source.Source;
+
+/**
+ * A Spark {@link Source} that is tailored to expose an {@link AggregatorMetric}, wrapping an
+ * underlying {@link NamedAggregators} instance.
+ */
+public class AggregatorMetricSource implements Source {
+  private static final String METRIC_NAME = "Aggregators";
+
+  private final String name;
+
+  private final MetricRegistry metricRegistry = new MetricRegistry();
+
+  public AggregatorMetricSource(final String name, final NamedAggregators aggregators) {
+    this.name = name;
+    metricRegistry.register(METRIC_NAME, AggregatorMetric.of(aggregators));
+  }
+
+  @Override
+  public String sourceName() {
+    return name;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/CompositeSource.java
similarity index 50%
copy from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
copy to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/CompositeSource.java
index b115d9b..8de06f7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/CompositeSource.java
@@ -15,13 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming;
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
 
-import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.spark.metrics.source.Source;
 
-/**
- * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
- * master address, and other user-related knobs.
- */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+/** Composite source made up of several {@link MetricRegistry} instances. */
+public class CompositeSource implements Source {
+  private final String name;
+  private final MetricRegistry metricRegistry;
+
+  public CompositeSource(final String name, MetricRegistry... metricRegistries) {
+    this.name = name;
+    this.metricRegistry = new MetricRegistry();
+    for (MetricRegistry metricRegistry : metricRegistries) {
+      this.metricRegistry.registerAll(metricRegistry);
+    }
+  }
+
+  @Override
+  public String sourceName() {
+    return name;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
new file mode 100644
index 0000000..da5f28d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsAccumulator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.AccumulatorV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For resilience, {@link AccumulatorV2 Accumulators} are required to be wrapped in a Singleton.
+ *
+ * @see <a
+ *     href="https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html">accumulatorsV2</a>
+ */
+public class MetricsAccumulator {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsAccumulator.class);
+
+  private static final String ACCUMULATOR_NAME = "Beam.Metrics";
+
+  private static volatile MetricsContainerStepMapAccumulator instance = null;
+
+  /** Init metrics accumulator if it has not been initiated. This method is idempotent. */
+  public static void init(JavaSparkContext jsc) {
+    if (instance == null) {
+      synchronized (MetricsAccumulator.class) {
+        if (instance == null) {
+          MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
+          MetricsContainerStepMapAccumulator accumulator =
+              new MetricsContainerStepMapAccumulator(metricsContainerStepMap);
+          jsc.sc().register(accumulator, ACCUMULATOR_NAME);
+
+          instance = accumulator;
+        }
+      }
+      LOG.info("Instantiated metrics accumulator: " + instance.value());
+    }
+  }
+
+  public static MetricsContainerStepMapAccumulator getInstance() {
+    if (instance == null) {
+      throw new IllegalStateException("Metrics accumulator has not been instantiated");
+    } else {
+      return instance;
+    }
+  }
+
+  @VisibleForTesting
+  public static void clear() {
+    synchronized (MetricsAccumulator.class) {
+      instance = null;
+    }
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsContainerStepMapAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsContainerStepMapAccumulator.java
new file mode 100644
index 0000000..c53ff0e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsContainerStepMapAccumulator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.spark.util.AccumulatorV2;
+
+/** {@link AccumulatorV2} implementation for {@link MetricsContainerStepMap}. */
+public class MetricsContainerStepMapAccumulator
+    extends AccumulatorV2<MetricsContainerStepMap, MetricsContainerStepMap> {
+  private static final MetricsContainerStepMap empty = new MetricsContainerStepMap();
+
+  private MetricsContainerStepMap value;
+
+  public MetricsContainerStepMapAccumulator(MetricsContainerStepMap value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean isZero() {
+    return value.equals(empty);
+  }
+
+  @Override
+  public MetricsContainerStepMapAccumulator copy() {
+    MetricsContainerStepMap newContainer = new MetricsContainerStepMap();
+    newContainer.updateAll(value);
+    return new MetricsContainerStepMapAccumulator(newContainer);
+  }
+
+  @Override
+  public void reset() {
+    this.value = new MetricsContainerStepMap();
+  }
+
+  @Override
+  public void add(MetricsContainerStepMap other) {
+    this.value.updateAll(other);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<MetricsContainerStepMap, MetricsContainerStepMap> other) {
+    this.value.updateAll(other.value());
+  }
+
+  @Override
+  public MetricsContainerStepMap value() {
+    return this.value;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetric.java
new file mode 100644
index 0000000..9d7d129
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetric.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import com.codahale.metrics.Metric;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+/**
+ * An adapter between the {@link MetricsContainerStepMap} and Codahale's {@link Metric} interface.
+ */
+class SparkBeamMetric implements Metric {
+  private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
+
+  Map<String, ?> renderAll() {
+    Map<String, Object> metrics = new HashMap<>();
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
+    MetricQueryResults metricQueryResults = metricResults.allMetrics();
+    for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
+      metrics.put(renderName(metricResult), metricResult.getAttempted());
+    }
+    for (MetricResult<DistributionResult> metricResult : metricQueryResults.getDistributions()) {
+      DistributionResult result = metricResult.getAttempted();
+      metrics.put(renderName(metricResult) + ".count", result.getCount());
+      metrics.put(renderName(metricResult) + ".sum", result.getSum());
+      metrics.put(renderName(metricResult) + ".min", result.getMin());
+      metrics.put(renderName(metricResult) + ".max", result.getMax());
+      metrics.put(renderName(metricResult) + ".mean", result.getMean());
+    }
+    for (MetricResult<GaugeResult> metricResult : metricQueryResults.getGauges()) {
+      metrics.put(renderName(metricResult), metricResult.getAttempted().getValue());
+    }
+    return metrics;
+  }
+
+  @VisibleForTesting
+  String renderName(MetricResult<?> metricResult) {
+    MetricKey key = metricResult.getKey();
+    MetricName name = key.metricName();
+    String step = key.stepName();
+
+    ArrayList<String> pieces = new ArrayList<>();
+
+    if (step != null) {
+      step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
+      if (step.endsWith("_")) {
+        step = step.substring(0, step.length() - 1);
+      }
+      pieces.add(step);
+    }
+
+    pieces.addAll(
+        ImmutableList.of(name.getNamespace(), name.getName()).stream()
+            .map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
+            .collect(toList()));
+
+    return String.join(".", pieces);
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetricSource.java
new file mode 100644
index 0000000..665dbc4
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkBeamMetricSource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.spark.metrics.source.Source;
+
+/**
+ * A Spark {@link Source} that is tailored to expose a {@link SparkBeamMetric}, wrapping an
+ * underlying {@link org.apache.beam.sdk.metrics.MetricResults} instance.
+ */
+public class SparkBeamMetricSource implements Source {
+  private static final String METRIC_NAME = "Metrics";
+
+  private final String name;
+
+  private final MetricRegistry metricRegistry = new MetricRegistry();
+
+  public SparkBeamMetricSource(final String name) {
+    this.name = name;
+    metricRegistry.register(METRIC_NAME, new SparkBeamMetric());
+  }
+
+  @Override
+  public String sourceName() {
+    return name;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkMetricsContainerStepMap.java
similarity index 56%
copy from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
copy to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkMetricsContainerStepMap.java
index b115d9b..fff68f2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/SparkMetricsContainerStepMap.java
@@ -15,13 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming;
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
 
-import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 
 /**
- * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
- * master address, and other user-related knobs.
+ * Sole purpose of this class is to override {@link #toString()} of {@link MetricsContainerStepMap}
+ * in order to show meaningful metrics in Spark Web Interface.
  */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+class SparkMetricsContainerStepMap extends MetricsContainerStepMap {
+
+  @Override
+  public String toString() {
+    return new SparkBeamMetric().renderAll().toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/package-info.java
similarity index 65%
copy from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
copy to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/package-info.java
index b115d9b..16a1a95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/package-info.java
@@ -15,13 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming;
 
-import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the
- * master address, and other user-related knobs.
- */
-public interface SparkStructuredStreamingPipelineOptions extends SparkCommonPipelineOptions {}
+/** Provides internal utilities for implementing Beam metrics using Spark accumulators. */
+package org.apache.beam.runners.spark.structuredstreaming.metrics;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index daa802d..bb69b6d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast;
@@ -54,6 +55,8 @@ import scala.Tuple2;
 public class DoFnFunction<InputT, OutputT>
     implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
+  private final MetricsContainerStepMapAccumulator metricsAccum;
+  private final String stepName;
   private final DoFn<InputT, OutputT> doFn;
   private transient boolean wasSetupCalled;
   private final WindowingStrategy<?, ?> windowingStrategy;
@@ -67,6 +70,8 @@ public class DoFnFunction<InputT, OutputT>
   private DoFnSchemaInformation doFnSchemaInformation;
 
   public DoFnFunction(
+      MetricsContainerStepMapAccumulator metricsAccum,
+      String stepName,
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
@@ -77,6 +82,8 @@ public class DoFnFunction<InputT, OutputT>
       Map<TupleTag<?>, Coder<?>> outputCoderMap,
       SideInputBroadcast broadcastStateData,
       DoFnSchemaInformation doFnSchemaInformation) {
+    this.metricsAccum = metricsAccum;
+    this.stepName = stepName;
     this.doFn = doFn;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
@@ -113,7 +120,13 @@ public class DoFnFunction<InputT, OutputT>
             windowingStrategy,
             doFnSchemaInformation);
 
-    return new ProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
+//    DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
+    DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
+        new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);
+
+//        return new ProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
+    return new ProcessContext<>(
+            doFn, doFnRunnerWithMetrics, outputManager, Collections.emptyIterator())
         .processPartition(iter)
         .iterator();
   }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
new file mode 100644
index 0000000..bc804ab
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** DoFnRunner decorator which registers {@link MetricsContainerImpl}. */
+class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+  private final DoFnRunner<InputT, OutputT> delegate;
+  private final String stepName;
+  private final MetricsContainerStepMapAccumulator metricsAccum;
+
+  DoFnRunnerWithMetrics(
+      String stepName,
+      DoFnRunner<InputT, OutputT> delegate,
+      MetricsContainerStepMapAccumulator metricsAccum) {
+    this.delegate = delegate;
+    this.stepName = stepName;
+    this.metricsAccum = metricsAccum;
+  }
+
+  @Override
+  public DoFn<InputT, OutputT> getFn() {
+    return delegate.getFn();
+  }
+
+  @Override
+  public void startBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processElement(final WindowedValue<InputT> elem) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onTimer(
+      final String timerId,
+      final BoundedWindow window,
+      final Instant timestamp,
+      final TimeDomain timeDomain) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.onTimer(timerId, window, timestamp, timeDomain);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
+      delegate.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private MetricsContainer metricsContainer() {
+    return metricsAccum.value().getContainer(stepName);
+  }
+}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 400b025..b3316e6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers;
@@ -62,6 +64,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
   @Override
   public void translateTransform(
       PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {
+    String stepName = context.getCurrentTransform().getFullName();
 
     // Check for not supported advanced features
     // TODO: add support of Splittable DoFn
@@ -101,9 +104,14 @@ class ParDoTranslatorBatch<InputT, OutputT>
     Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
     Coder<InputT> inputCoder = ((PCollection<InputT>) context.getInput()).getCoder();
 
+    MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance();
+//    MetricsContainerStepMapAccumulator metricsAccum = null;
+
     @SuppressWarnings("unchecked")
     DoFnFunction<InputT, OutputT> doFnWrapper =
         new DoFnFunction(
+            metricsAccum,
+            stepName,
             doFn,
             windowingStrategy,
             sideInputStrategies,


Mime
View raw message