beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [43/50] [abbrv] beam git commit: [BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline
Date Fri, 17 Mar 2017 21:47:14 GMT
[BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/874c8d0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/874c8d0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/874c8d0d

Branch: refs/heads/gearpump-runner
Commit: 874c8d0da65568b01cd5f184e303d39c7810a8bf
Parents: d167153
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Mon Mar 6 20:48:48 2017 +0200
Committer: Stas Levin <staslevin@apache.org>
Committed: Sun Mar 12 10:02:23 2017 +0200

----------------------------------------------------------------------
 .../spark/SparkNativePipelineVisitor.java       |  4 --
 .../beam/runners/spark/SparkPipelineResult.java |  8 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 65 ++++++++++----------
 .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++++++---
 .../beam/runners/spark/TestSparkRunner.java     |  4 +-
 .../aggregators/AggregatorsAccumulator.java     | 44 +++++++++----
 .../spark/aggregators/SparkAggregators.java     | 40 ++----------
 .../spark/metrics/AggregatorMetricSource.java   | 11 ++--
 .../spark/metrics/MetricsAccumulator.java       | 38 ++++++++----
 .../spark/metrics/SparkBeamMetricSource.java    | 11 ++--
 .../spark/metrics/SparkMetricsContainer.java    | 17 ++---
 .../spark/translation/TransformTranslator.java  | 13 ++--
 .../SparkRunnerStreamingContextFactory.java     |  3 +
 .../streaming/StreamingTransformTranslator.java | 10 +--
 .../metrics/sink/NamedAggregatorsTest.java      | 15 +----
 .../ResumeFromCheckpointStreamingTest.java      |  4 +-
 16 files changed, 156 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 056da97..c2784a2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -27,11 +26,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
-import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -55,7 +52,6 @@ public class SparkNativePipelineVisitor extends SparkRunner.Evaluator {
   SparkNativePipelineVisitor(SparkPipelineTranslator translator, EvaluationContext ctxt)
{
     super(translator, ctxt);
     this.transforms = new ArrayList<>();
-    MetricsAccumulator.init(ctxt.getSparkContext(), Optional.<Checkpoint.CheckpointDir>absent());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index ddc1964..ed1e0c8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -84,13 +83,12 @@ public abstract class SparkPipelineResult implements PipelineResult {
       throws TimeoutException, ExecutionException, InterruptedException;
 
   public <T> T getAggregatorValue(final String name, final Class<T> resultType)
{
-    return SparkAggregators.valueOf(name, resultType, javaSparkContext);
+    return SparkAggregators.valueOf(name, resultType);
   }
 
   @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T>
aggregator)
-      throws AggregatorRetrievalException {
-    return SparkAggregators.valueOf(aggregator, javaSparkContext);
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T>
aggregator) {
+    return SparkAggregators.valueOf(aggregator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index a706f00..de648fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.spark;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,8 +26,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
 import org.apache.beam.runners.spark.metrics.CompositeSource;
@@ -59,7 +56,6 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.spark.Accumulator;
 import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
@@ -141,31 +137,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
     mOptions = options;
   }
 
-  private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc)
{
-    Optional<CheckpointDir> maybeCheckpointDir =
-        opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
-            : Optional.<CheckpointDir>absent();
-    final Accumulator<NamedAggregators> aggregatorsAccumulator =
-        SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir);
-    // Instantiate metrics accumulator
-    MetricsAccumulator.init(jsc, maybeCheckpointDir);
-    final NamedAggregators initialValue = aggregatorsAccumulator.value();
-    if (opts.getEnableSparkMetricSinks()) {
-      final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
-      String appName = opts.getAppName();
-      final AggregatorMetricSource aggregatorMetricSource =
-          new AggregatorMetricSource(appName, initialValue);
-      final SparkBeamMetricSource metricsSource =
-          new SparkBeamMetricSource(appName);
-      final CompositeSource compositeSource =
-          new CompositeSource(appName,
-              metricsSource.metricRegistry(), aggregatorMetricSource.metricRegistry());
-      // re-register the metrics in case of context re-use
-      metricsSystem.removeSource(compositeSource);
-      metricsSystem.registerSource(compositeSource);
-    }
-  }
-
   @Override
   public SparkPipelineResult run(final Pipeline pipeline) {
     LOG.info("Executing pipeline using the SparkRunner.");
@@ -203,11 +174,16 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
       // register Watermarks listener to broadcast the advanced WMs.
       jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc)));
 
+      // The reason we call initAccumulators here even though it is called in
+      // SparkRunnerStreamingContextFactory is because the factory is not called when resuming
+      // from checkpoint (When not resuming from checkpoint initAccumulators will be called
twice
+      // but this is fine since it is idempotent).
+      initAccumulators(mOptions, jssc.sparkContext());
+
       startPipeline = executorService.submit(new Runnable() {
 
         @Override
         public void run() {
-          registerMetrics(mOptions, jssc.sparkContext());
           LOG.info("Starting streaming pipeline execution.");
           jssc.start();
         }
@@ -218,11 +194,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
       final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
       final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
 
+      initAccumulators(mOptions, jsc);
+
       startPipeline = executorService.submit(new Runnable() {
 
         @Override
         public void run() {
-          registerMetrics(mOptions, jsc);
           pipeline.traverseTopologically(new Evaluator(new TransformTranslator.Translator(),
                                                        evaluationContext));
           evaluationContext.computeOutputs();
@@ -233,9 +210,35 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
       result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
     }
 
+    if (mOptions.getEnableSparkMetricSinks()) {
+      registerMetricsSource(mOptions.getAppName());
+    }
+
     return result;
   }
 
+  private void registerMetricsSource(String appName) {
+      final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
+      final AggregatorMetricSource aggregatorMetricSource =
+          new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
+      final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
+      final CompositeSource compositeSource =
+          new CompositeSource(appName + ".Beam", metricsSource.metricRegistry(),
+              aggregatorMetricSource.metricRegistry());
+      // re-register the metrics in case of context re-use
+      metricsSystem.removeSource(compositeSource);
+      metricsSystem.registerSource(compositeSource);
+  }
+
+  /**
+   * Init Metrics/Aggregators accumulators. This method is idempotent.
+   */
+  public static void initAccumulators(SparkPipelineOptions opts, JavaSparkContext jsc) {
+    // Init metrics accumulators
+    MetricsAccumulator.init(opts, jsc);
+    AggregatorsAccumulator.init(opts, jsc);
+  }
+
   /**
    * Detect the translation mode for the pipeline and change options in case streaming
    * translation is needed.

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 395acff..7f7aefc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -20,12 +20,14 @@ package org.apache.beam.runners.spark;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -53,23 +55,34 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerDebugger.class);
 
-  private SparkRunnerDebugger() {}
+  private final SparkPipelineOptions options;
+
+  private SparkRunnerDebugger(SparkPipelineOptions options) {
+    this.options = options;
+  }
 
-  @SuppressWarnings("unused")
   public static SparkRunnerDebugger fromOptions(PipelineOptions options) {
-    return new SparkRunnerDebugger();
+    if (options instanceof TestSparkPipelineOptions) {
+      TestSparkPipelineOptions testSparkPipelineOptions =
+          PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
+      return new SparkRunnerDebugger(testSparkPipelineOptions);
+    } else {
+      SparkPipelineOptions sparkPipelineOptions =
+          PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+      return new SparkRunnerDebugger(sparkPipelineOptions);
+    }
   }
 
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
-    SparkPipelineResult result;
-
-    SparkPipelineOptions options = (SparkPipelineOptions) pipeline.getOptions();
-
     JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
     JavaStreamingContext jssc =
         new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000));
+
+    SparkRunner.initAccumulators(options, jsc);
+
     TransformTranslator.Translator translator = new TransformTranslator.Translator();
+
     SparkNativePipelineVisitor visitor;
     if (options.isStreaming()
         || options instanceof TestSparkPipelineOptions
@@ -82,8 +95,11 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
       EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
       visitor = new SparkNativePipelineVisitor(translator, ctxt);
     }
+
     pipeline.traverseTopologically(visitor);
+
     jsc.stop();
+
     String debugString = visitor.getDebugString();
     LOG.info("Translated Native Spark pipeline:\n" + debugString);
     return new DebugSparkPipelineResult(debugString);

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index d321f99..e436422 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -34,7 +34,7 @@ import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
@@ -115,7 +115,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult>
{
 
     // clear state of Aggregators, Metrics and Watermarks if exists.
     AggregatorsAccumulator.clear();
-    SparkMetricsContainer.clear();
+    MetricsAccumulator.clear();
     GlobalWatermarkHolder.clear();
 
     LOG.info("About to run test pipeline " + testSparkPipelineOptions.getJobName());

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
index 261c327..b8fc81b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.aggregators;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import java.io.IOException;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,30 +41,48 @@ import org.slf4j.LoggerFactory;
 public class AggregatorsAccumulator {
   private static final Logger LOG = LoggerFactory.getLogger(AggregatorsAccumulator.class);
 
+  private static final String ACCUMULATOR_NAME = "Beam.Aggregators";
   private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "aggregators";
 
-  private static volatile Accumulator<NamedAggregators> instance;
+  private static volatile Accumulator<NamedAggregators> instance = null;
   private static volatile FileSystem fileSystem;
   private static volatile Path checkpointFilePath;
 
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  static Accumulator<NamedAggregators> getInstance(
-      JavaSparkContext jsc,
-      Optional<CheckpointDir> checkpointDir) {
+  /**
+   * Init aggregators accumulator if it has not been initiated. This method is idempotent.
+   */
+  public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) {
     if (instance == null) {
       synchronized (AggregatorsAccumulator.class) {
         if (instance == null) {
-          instance = jsc.sc().accumulator(new NamedAggregators(), new AggAccumParam());
-          if (checkpointDir.isPresent()) {
-            recoverValueFromCheckpoint(jsc, checkpointDir.get());
+          Optional<CheckpointDir> maybeCheckpointDir =
+              opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
+                  : Optional.<CheckpointDir>absent();
+          Accumulator<NamedAggregators> accumulator =
+              jsc.sc().accumulator(new NamedAggregators(), ACCUMULATOR_NAME, new AggAccumParam());
+          if (maybeCheckpointDir.isPresent()) {
+            Optional<NamedAggregators> maybeRecoveredValue =
+                recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get());
+            if (maybeRecoveredValue.isPresent()) {
+              accumulator.setValue(maybeRecoveredValue.get());
+            }
           }
+          instance = accumulator;
         }
       }
+      LOG.info("Instantiated aggregators accumulator: " + instance.value());
+    }
+  }
+
+  public static Accumulator<NamedAggregators> getInstance() {
+    if (instance == null) {
+      throw new IllegalStateException("Aggregrators accumulator has not been instantiated");
+    } else {
+      return instance;
     }
-    return instance;
   }
 
-  private static void recoverValueFromCheckpoint(
+  private static Optional<NamedAggregators> recoverValueFromCheckpoint(
       JavaSparkContext jsc,
       CheckpointDir checkpointDir) {
     try {
@@ -72,14 +91,15 @@ public class AggregatorsAccumulator {
       fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration());
       NamedAggregators recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath);
       if (recoveredValue != null) {
-        LOG.info("Recovered accumulators from checkpoint: " + recoveredValue);
-        instance.setValue(recoveredValue);
+        LOG.info("Recovered aggregators from checkpoint");
+        return Optional.of(recoveredValue);
       } else {
         LOG.info("No accumulator checkpoint found.");
       }
     } catch (Exception e) {
       throw new RuntimeException("Failure while reading accumulator checkpoint.", e);
     }
+    return Optional.absent();
   }
 
   private static void checkpoint() throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index 131b761..1da196b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -18,19 +18,16 @@
 
 package org.apache.beam.runners.spark.aggregators;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * A utility class for handling Beam {@link Aggregator}s.
@@ -64,41 +61,14 @@ public class SparkAggregators {
   }
 
   /**
-   * Retrieves the {@link NamedAggregators} instance using the provided Spark context.
-   *
-   * @param jsc a Spark context to be used in order to retrieve the name
-   * {@link NamedAggregators} instance
-   */
-  public static Accumulator<NamedAggregators> getNamedAggregators(JavaSparkContext
jsc) {
-    return getOrCreateNamedAggregators(jsc, Optional.<CheckpointDir>absent());
-  }
-
-  /**
-   * Retrieves or creates the {@link NamedAggregators} instance using the provided Spark
context.
-   *
-   * @param jsc a Spark context to be used in order to retrieve the name
-   * {@link NamedAggregators} instance
-   * @param checkpointDir checkpoint dir (optional, for streaming pipelines)
-   * @return a {@link NamedAggregators} instance
-   */
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  public static Accumulator<NamedAggregators> getOrCreateNamedAggregators(
-      JavaSparkContext jsc,
-      Optional<CheckpointDir> checkpointDir) {
-    return AggregatorsAccumulator.getInstance(jsc, checkpointDir);
-  }
-
-  /**
    * Retrieves the value of an aggregator from a SparkContext instance.
    *
    * @param aggregator The aggregator whose value to retrieve
-   * @param javaSparkContext The SparkContext instance
    * @param <T> The type of the aggregator's output
    * @return The value of the aggregator
    */
-  public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T>
aggregator,
-                                                final JavaSparkContext javaSparkContext)
{
-    return valueOf(getNamedAggregators(javaSparkContext), aggregator);
+  public static <T> AggregatorValues<T> valueOf(final Aggregator<?, T>
aggregator) {
+    return valueOf(AggregatorsAccumulator.getInstance(), aggregator);
   }
 
   /**
@@ -109,10 +79,8 @@ public class SparkAggregators {
    * @param <T>            Type of object to be returned.
    * @return The value of the aggregator.
    */
-  public static <T> T valueOf(final String name,
-                              final Class<T> typeClass,
-                              final JavaSparkContext javaSparkContext) {
-    return valueOf(getNamedAggregators(javaSparkContext), name, typeClass);
+  public static <T> T valueOf(final String name, final Class<T> typeClass) {
+    return valueOf(AggregatorsAccumulator.getInstance(), name, typeClass);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
index b3880e8..919e6f2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetricSource.java
@@ -28,19 +28,20 @@ import org.apache.spark.metrics.source.Source;
  * wrapping an underlying {@link NamedAggregators} instance.
  */
 public class AggregatorMetricSource implements Source {
+  private static final String METRIC_NAME = "Aggregators";
 
-  private final String sourceName;
+  private final String name;
 
   private final MetricRegistry metricRegistry = new MetricRegistry();
 
-  public AggregatorMetricSource(final String appName, final NamedAggregators aggregators)
{
-    sourceName = appName;
-    metricRegistry.register("Beam.Aggregators", AggregatorMetric.of(aggregators));
+  public AggregatorMetricSource(final String name, final NamedAggregators aggregators) {
+    this.name = name;
+    metricRegistry.register(METRIC_NAME, AggregatorMetric.of(aggregators));
   }
 
   @Override
   public String sourceName() {
-    return sourceName;
+    return name;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
index 9d48289..1153db6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.metrics;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import java.io.IOException;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,27 +41,37 @@ import org.slf4j.LoggerFactory;
 public class MetricsAccumulator {
   private static final Logger LOG = LoggerFactory.getLogger(MetricsAccumulator.class);
 
+  private static final String ACCUMULATOR_NAME = "Beam.Metrics";
   private static final String ACCUMULATOR_CHECKPOINT_FILENAME = "metrics";
 
   private static volatile Accumulator<SparkMetricsContainer> instance = null;
   private static volatile FileSystem fileSystem;
   private static volatile Path checkpointFilePath;
 
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  public static void init(
-      JavaSparkContext jsc,
-      Optional<CheckpointDir> checkpointDir) {
+  /**
+   * Init metrics accumulator if it has not been initiated. This method is idempotent.
+   */
+  public static void init(SparkPipelineOptions opts, JavaSparkContext jsc) {
     if (instance == null) {
       synchronized (MetricsAccumulator.class) {
         if (instance == null) {
-          SparkMetricsContainer initialValue = new SparkMetricsContainer();
-          instance = jsc.sc().accumulator(initialValue, "Beam.Metrics",
-              new MetricsAccumulatorParam());
-          if (checkpointDir.isPresent()) {
-            recoverValueFromCheckpoint(jsc, checkpointDir.get());
+          Optional<CheckpointDir> maybeCheckpointDir =
+              opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir()))
+                  : Optional.<CheckpointDir>absent();
+          Accumulator<SparkMetricsContainer> accumulator =
+              jsc.sc().accumulator(new SparkMetricsContainer(), ACCUMULATOR_NAME,
+                  new MetricsAccumulatorParam());
+          if (maybeCheckpointDir.isPresent()) {
+            Optional<SparkMetricsContainer> maybeRecoveredValue =
+                recoverValueFromCheckpoint(jsc, maybeCheckpointDir.get());
+            if (maybeRecoveredValue.isPresent()) {
+              accumulator.setValue(maybeRecoveredValue.get());
+            }
           }
+          instance = accumulator;
         }
       }
+      LOG.info("Instantiated metrics accumulator: " + instance.value());
     }
   }
 
@@ -72,7 +83,7 @@ public class MetricsAccumulator {
     }
   }
 
-  private static void recoverValueFromCheckpoint(
+  private static Optional<SparkMetricsContainer> recoverValueFromCheckpoint(
       JavaSparkContext jsc,
       CheckpointDir checkpointDir) {
     try {
@@ -81,18 +92,19 @@ public class MetricsAccumulator {
       fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration());
       SparkMetricsContainer recoveredValue = Checkpoint.readObject(fileSystem, checkpointFilePath);
       if (recoveredValue != null) {
-        LOG.info("Recovered metrics from checkpoint: " + recoveredValue);
-        instance.setValue(recoveredValue);
+        LOG.info("Recovered metrics from checkpoint.");
+        return Optional.of(recoveredValue);
       } else {
         LOG.info("No metrics checkpoint found.");
       }
     } catch (Exception e) {
       throw new RuntimeException("Failure while reading metrics checkpoint.", e);
     }
+    return Optional.absent();
   }
 
   @VisibleForTesting
-  static void clear() {
+  public static void clear() {
     synchronized (MetricsAccumulator.class) {
       instance = null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
index 24231c3..9cab66d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java
@@ -28,19 +28,20 @@ import org.apache.spark.metrics.source.Source;
  * wrapping an underlying {@link SparkMetricsContainer} instance.
  */
 public class SparkBeamMetricSource implements Source {
+  private static final String METRIC_NAME = "Metrics";
 
-  private final String sourceName;
+  private final String name;
 
   private final MetricRegistry metricRegistry = new MetricRegistry();
 
-  public SparkBeamMetricSource(final String appName) {
-    sourceName = appName;
-    metricRegistry.register("Beam.Metrics", new SparkBeamMetric());
+  public SparkBeamMetricSource(final String name) {
+    this.name = name;
+    metricRegistry.register(METRIC_NAME, new SparkBeamMetric());
   }
 
   @Override
   public String sourceName() {
-    return sourceName;
+    return name;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index 7a4b222..d376ce3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.spark.metrics;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -66,11 +65,15 @@ public class SparkMetricsContainer implements Serializable {
   }
 
   static Collection<MetricUpdate<Long>> getCounters() {
-    return getInstance().counters.values();
+    SparkMetricsContainer sparkMetricsContainer = getInstance();
+    sparkMetricsContainer.materialize();
+    return sparkMetricsContainer.counters.values();
   }
 
   static Collection<MetricUpdate<DistributionData>> getDistributions() {
-    return getInstance().distributions.values();
+    SparkMetricsContainer sparkMetricsContainer = getInstance();
+    sparkMetricsContainer.materialize();
+    return sparkMetricsContainer.distributions.values();
   }
 
   SparkMetricsContainer update(SparkMetricsContainer other) {
@@ -141,12 +144,4 @@ public class SparkMetricsContainer implements Serializable {
     }
     return sb.toString();
   }
-
-  @VisibleForTesting
-  public static void clear() {
-    try {
-      MetricsAccumulator.clear();
-    } catch (IllegalStateException ignored) {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 44b4039..8d1b82e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -37,8 +37,8 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.SourceRDD;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
@@ -138,8 +138,7 @@ public final class TransformTranslator {
             ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
         @SuppressWarnings("unchecked")
         final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-        final Accumulator<NamedAggregators> accum =
-            SparkAggregators.getNamedAggregators(context.getSparkContext());
+        final Accumulator<NamedAggregators> accum = AggregatorsAccumulator.getInstance();
         @SuppressWarnings("unchecked")
         final WindowingStrategy<?, W> windowingStrategy =
             (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
@@ -362,9 +361,7 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        JavaSparkContext jsc = context.getSparkContext();
-        Accumulator<NamedAggregators> aggAccum =
-            SparkAggregators.getNamedAggregators(jsc);
+        Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
         Accumulator<SparkMetricsContainer> metricsAccum =
             MetricsAccumulator.getInstance();
         Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs =
@@ -395,9 +392,7 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        JavaSparkContext jsc = context.getSparkContext();
-        Accumulator<NamedAggregators> aggAccum =
-            SparkAggregators.getNamedAggregators(jsc);
+        Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
         Accumulator<SparkMetricsContainer> metricsAccum =
             MetricsAccumulator.getInstance();
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index ffa8e69..7048be6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -79,6 +79,9 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
     JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
 
+    // We must first init accumulators since translators expect them to be instantiated.
+    SparkRunner.initAccumulators(options, jsc);
+
     ctxt = new EvaluationContext(jsc, pipeline, jssc);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
     ctxt.computeOutputs();

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 8a05fbb..2744169 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -32,8 +32,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nonnull;
+import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
@@ -92,7 +92,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 
-
 /**
  * Supports translation between a Beam transform, and Spark's operations on DStreams.
  */
@@ -394,8 +393,7 @@ public final class StreamingTransformTranslator {
           public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>>
rdd) throws
               Exception {
             final JavaSparkContext jsc = new JavaSparkContext(rdd.context());
-            final Accumulator<NamedAggregators> aggAccum =
-                SparkAggregators.getNamedAggregators(jsc);
+            final Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
             final Accumulator<SparkMetricsContainer> metricsAccum =
                 MetricsAccumulator.getInstance();
             final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs =
@@ -444,9 +442,7 @@ public final class StreamingTransformTranslator {
           public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
               JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
             String stepName = context.getCurrentTransform().getFullName();
-            JavaSparkContext jsc = new JavaSparkContext(rdd.context());
-            final Accumulator<NamedAggregators> aggAccum =
-                SparkAggregators.getNamedAggregators(jsc);
+            final Accumulator<NamedAggregators> aggAccum = AggregatorsAccumulator.getInstance();
             final Accumulator<SparkMetricsContainer> metricsAccum =
                 MetricsAccumulator.getInstance();
             final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>>
sideInputs =

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index a192807..dbd8cac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -27,14 +27,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -86,24 +83,18 @@ public class NamedAggregatorsTest {
 
   @Test
   public void testNamedAggregators() throws Exception {
-
-    // don't reuse context in this test, as is tends to mess up Spark's MetricsSystem thread-safety
-    System.setProperty("beam.spark.test.reuseSparkContext", "false");
-
     assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
 
     runPipeline();
 
     assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
-
   }
 
   @Test
   public void testNonExistingAggregatorName() throws Exception {
-    final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    final Long valueOf =
-        SparkAggregators.valueOf(
-            "myMissingAggregator", Long.class, SparkContextFactory.getSparkContext(options));
+    runPipeline();
+
+    final Long valueOf = SparkAggregators.valueOf("myMissingAggregator", Long.class);
 
     assertThat(valueOf, is(nullValue()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index bc22980..ce502d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.TestSparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
@@ -171,7 +171,7 @@ public class ResumeFromCheckpointStreamingTest {
 
     //- clear state.
     AggregatorsAccumulator.clear();
-    SparkMetricsContainer.clear();
+    MetricsAccumulator.clear();
     GlobalWatermarkHolder.clear();
 
     //- write a bit more.


Mime
View raw message