beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: Remove aggregators from DoFn contexts and internal SDK usage
Date Tue, 02 May 2017 05:30:07 GMT
Repository: beam
Updated Branches:
  refs/heads/master 19ae45b2f -> 87cf88ade


Remove aggregators from DoFn contexts and internal SDK usage


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4253a60f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4253a60f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4253a60f

Branch: refs/heads/master
Commit: 4253a60f4ef425cb9845f4b6cbd79cace2d18e20
Parents: 19ae45b
Author: Pablo <pabloem@google.com>
Authored: Wed Apr 26 10:55:53 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon May 1 22:17:46 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   1 -
 .../apache/beam/runners/core/DoFnAdapters.java  |  18 --
 .../apache/beam/runners/core/DoFnRunners.java   |   6 +-
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   1 -
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  12 -
 .../core/LateDataDroppingDoFnRunner.java        |  20 +-
 ...eBoundedSplittableProcessElementInvoker.java |   8 -
 .../beam/runners/core/ReduceFnRunner.java       |  15 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |  24 --
 .../beam/runners/core/SplittableParDo.java      |   8 -
 .../core/LateDataDroppingDoFnRunnerTest.java    |  20 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   |  69 ++++--
 .../beam/runners/core/ReduceFnTester.java       |  38 ----
 .../beam/runners/direct/DirectRunner.java       |  10 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   1 -
 .../wrappers/streaming/DoFnOperator.java        |   3 +-
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   1 -
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   1 -
 .../beam/sdk/AggregatorPipelineExtractor.java   |  10 +-
 .../apache/beam/sdk/transforms/Aggregator.java  |  24 --
 .../sdk/transforms/AggregatorRetriever.java     |  45 ----
 .../org/apache/beam/sdk/transforms/DoFn.java    | 120 +---------
 .../apache/beam/sdk/transforms/DoFnTester.java  |  64 ------
 .../org/apache/beam/sdk/transforms/Latest.java  |   3 +-
 .../sdk/AggregatorPipelineExtractorTest.java    | 226 -------------------
 .../apache/beam/sdk/transforms/DoFnTest.java    | 162 -------------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  47 +---
 .../beam/sdk/transforms/LatestFnTest.java       |  40 ----
 30 files changed, 106 insertions(+), 894 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 3c9f5ab..4fdb600 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -192,7 +192,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
           }
         },
         NullSideInputReader.empty(),
-        null,
         SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder),
         serializedOptions.get());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 66ad736..b2069e1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -19,9 +19,6 @@ package org.apache.beam.runners.core;
 
 import java.io.IOException;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
@@ -72,7 +69,6 @@ public class DoFnAdapters {
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
     SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
-      super(AggregatorRetriever.getDelegatingAggregators(fn));
       this.fn = fn;
       this.invoker = DoFnInvokers.invokerFor(fn);
     }
@@ -143,7 +139,6 @@ public class DoFnAdapters {
     private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
       fn.super();
       this.context = context;
-      super.setupDelegateAggregators();
     }
 
     @Override
@@ -172,13 +167,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
     public BoundedWindow window() {
       // The OldDoFn doesn't allow us to ask for these outside processElement, so this
       // should be unreachable.
@@ -265,12 +253,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      return context.createAggregatorInternal(name, combiner);
-    }
-
-    @Override
     public InputT element() {
       return context.element();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 8501e72..4384b39 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -115,13 +115,11 @@ public class DoFnRunners {
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
           DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> wrappedRunner,
           StepContext stepContext,
-          WindowingStrategy<?, W> windowingStrategy,
-          Aggregator<Long, Long> droppedDueToLatenessAggregator) {
+          WindowingStrategy<?, W> windowingStrategy) {
     return new LateDataDroppingDoFnRunner<>(
         wrappedRunner,
         windowingStrategy,
-        stepContext.timerInternals(),
-        droppedDueToLatenessAggregator);
+        stepContext.timerInternals());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 5508b2e..9e66f07 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -70,7 +70,6 @@ public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends
             timerInternals,
             WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
             WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
-            droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index bf48df1..05572ea 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -84,7 +84,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             timerInternals,
             WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
             WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
-            droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 0cf6e2d..de4ac29 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -21,9 +21,7 @@ import java.util.Collection;
 import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
@@ -59,11 +57,6 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
             reduceFn, outputManager, mainTag);
   }
 
-  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
-      createAggregator(
-          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
-  protected final Aggregator<Long, Long> droppedDueToLateness =
-      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
   private final WindowingStrategy<Object, W> windowingStrategy;
   private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
   private transient StateInternalsFactory<K> stateInternalsFactory;
@@ -135,7 +128,6 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
             timerInternals,
             outputWindowedValue(),
             sideInputReader,
-            droppedDueToClosedWindow,
             reduceFn,
             c.getPipelineOptions());
 
@@ -147,8 +139,4 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn<
   public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
     throw new RuntimeException("Not implement!");
   }
-
-  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
-    return droppedDueToLateness;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 4d41527..d6ba8f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -21,7 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -48,13 +49,14 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
   private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
   private final LateDataFilter lateDataFilter;
 
+  public static final String DROPPED_DUE_TO_LATENESS = "droppedDueToLateness";
+
   public LateDataDroppingDoFnRunner(
       DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
       WindowingStrategy<?, ?> windowingStrategy,
-      TimerInternals timerInternals,
-      Aggregator<Long, Long> droppedDueToLateness) {
+      TimerInternals timerInternals) {
     this.doFnRunner = doFnRunner;
-    lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
+    lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals);
   }
 
   @Override
@@ -89,15 +91,15 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
   static class LateDataFilter {
     private final WindowingStrategy<?, ?> windowingStrategy;
     private final TimerInternals timerInternals;
-    private final Aggregator<Long, Long> droppedDueToLateness;
+    private final Counter droppedDueToLateness;
 
     public LateDataFilter(
         WindowingStrategy<?, ?> windowingStrategy,
-        TimerInternals timerInternals,
-        Aggregator<Long, Long> droppedDueToLateness) {
+        TimerInternals timerInternals) {
       this.windowingStrategy = windowingStrategy;
       this.timerInternals = timerInternals;
-      this.droppedDueToLateness = droppedDueToLateness;
+      this.droppedDueToLateness = Metrics.counter(LateDataDroppingDoFnRunner.class,
+          DROPPED_DUE_TO_LATENESS);
     }
 
     /**
@@ -129,7 +131,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin
         BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
         if (canDropDueToExpiredWindow(window)) {
           // The element is too late for this window.
-          droppedDueToLateness.addValue(1L);
+          droppedDueToLateness.inc();
           WindowTracing.debug(
               "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
               + "since too far behind inputWatermark:{}; outputWatermark:{}",

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index d132af6..35d6eb7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -27,8 +27,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -271,11 +269,5 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
         initiateCheckpoint();
       }
     }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0be7c95..08014ec 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -42,8 +42,9 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -72,7 +73,7 @@ import org.joda.time.Instant;
  *
  * <ul>
  * <li>Tracking the windows that are active (have buffered data) as elements arrive and triggers are
- *     fired.
+ *  fired.
  * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it when the
  *     trigger fires.
  * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
@@ -109,7 +110,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
   private final StateInternals<K> stateInternals;
 
-  private final Aggregator<Long, Long> droppedDueToClosedWindow;
+  private final Counter droppedDueToClosedWindow;
+
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW = "droppedDueToClosedWindow";
 
   private final K key;
 
@@ -215,7 +218,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       TimerInternals timerInternals,
       OutputWindowedValue<KV<K, OutputT>> outputter,
       SideInputReader sideInputReader,
-      Aggregator<Long, Long> droppedDueToClosedWindow,
       ReduceFn<K, InputT, OutputT, W> reduceFn,
       PipelineOptions options) {
     this.key = key;
@@ -223,8 +225,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     this.paneInfoTracker = new PaneInfoTracker(timerInternals);
     this.stateInternals = stateInternals;
     this.outputter = outputter;
-    this.droppedDueToClosedWindow = droppedDueToClosedWindow;
     this.reduceFn = reduceFn;
+    this.droppedDueToClosedWindow = Metrics.counter(ReduceFnRunner.class,
+        DROPPED_DUE_TO_CLOSED_WINDOW);
 
     @SuppressWarnings("unchecked")
     WindowingStrategy<Object, W> objectWindowingStrategy =
@@ -581,7 +584,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
           window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
       if (triggerRunner.isClosed(directContext.state())) {
         // This window has already been closed.
-        droppedDueToClosedWindow.addValue(1L);
+        droppedDueToClosedWindow.inc();
         WindowTracing.debug(
             "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
             + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 141bf20..a5733da 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -32,8 +32,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
@@ -277,7 +275,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       this.stepContext = stepContext;
       this.aggregatorFactory = aggregatorFactory;
       this.windowFn = windowFn;
-      super.setupDelegateAggregators();
     }
 
     //////////////////////////////////////////////////////////////////////////////
@@ -409,13 +406,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      checkNotNull(combiner, "Combiner passed to createAggregator cannot be null");
-      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
-    }
-
-    @Override
     public BoundedWindow window() {
       throw new UnsupportedOperationException(
           "Cannot access window outside of @ProcessElement and @OnTimer methods.");
@@ -600,13 +590,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    protected <AggregatorInputT, AggregatorOutputT>
-        Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator(
-            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
-      return context.createAggregator(name, combiner);
-    }
-
-    @Override
     public BoundedWindow window() {
       return Iterables.getOnlyElement(windowedValue.getWindows());
     }
@@ -797,13 +780,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       context.outputWindowedValue(
           tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name,
-        CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
-    }
   }
 
   private static class TimerInternalsTimer implements Timer {

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 5273e86..0fa6f76 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -29,8 +29,6 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -552,12 +550,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
           throwUnsupportedOutput();
         }
 
-        @Override
-        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-            String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-          return fn.createAggregator(name, combiner);
-        }
-
         private void throwUnsupportedOutput() {
           throw new UnsupportedOperationException(
               String.format(

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 57ef8f0..74fb562 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Sum;
@@ -58,12 +61,11 @@ public class LateDataDroppingDoFnRunnerTest {
 
   @Test
   public void testLateDataFilter() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
     when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L));
 
-    InMemoryLongSumAggregator droppedDueToLateness =
-        new InMemoryLongSumAggregator("droppedDueToLateness");
     LateDataFilter lateDataFilter = new LateDataFilter(
-        WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness);
+        WindowingStrategy.of(WINDOW_FN), mockTimerInternals);
 
     Iterable<WindowedValue<Integer>> actual = lateDataFilter.filter(
         "a",
@@ -78,10 +80,18 @@ public class LateDataDroppingDoFnRunnerTest {
         createDatum(16, 16L),
         createDatum(18, 18L));
     assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    assertEquals(1, droppedDueToLateness.sum);
+    long droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(LateDataDroppingDoFnRunner.class,
+            LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
+        .getCumulative().longValue();
+    assertEquals(1, droppedValues);
     // Ensure that reiterating returns the same results and doesn't increment the counter again.
     assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
-    assertEquals(1, droppedDueToLateness.sum);
+    droppedValues = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(LateDataDroppingDoFnRunner.class,
+            LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS))
+        .getCumulative().longValue();
+    assertEquals(1, droppedValues);
   }
 
   private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index ec2e7a3..7b91151 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -40,6 +40,9 @@ import com.google.common.collect.Iterables;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -143,6 +146,7 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and discarding mode.
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
         ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
             AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
@@ -167,7 +171,11 @@ public class ReduceFnRunnerTest {
     // This element shouldn't be seen, because the trigger has finished
     injectElement(tester, 4);
 
-    assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
+    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(1, droppedElements);
   }
 
   @Test
@@ -416,6 +424,7 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testWatermarkHoldAndLateData() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
     // Test handling of late data. Specifically, ensure the watermark hold is correct.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
         ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine,
@@ -444,7 +453,11 @@ public class ReduceFnRunnerTest {
     // Holding for the end-of-window transition.
     assertEquals(new Instant(9), tester.getWatermarkHold());
     // Nothing dropped.
-    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(0, droppedElements);
 
     // Input watermark -> 4, output watermark should advance that far as well
     tester.advanceInputWatermark(new Instant(4));
@@ -502,7 +515,11 @@ public class ReduceFnRunnerTest {
     // Because we're about to expire the window, we output it.
     when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
     injectElement(tester, 8);
-    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(0, droppedElements);
 
     // Exceed the GC limit, triggering the last pane to be fired
     tester.advanceInputWatermark(new Instant(50));
@@ -1067,13 +1084,13 @@ public class ReduceFnRunnerTest {
    */
   @Test
   public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
-    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
-        ReduceFnTester.combining(
-            WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
-                .withTrigger(AfterWatermark.pastEndOfWindow())
-                .withAllowedLateness(Duration.millis(1000)),
-            Sum.ofIntegers(),
-            VarIntCoder.of());
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
+        WindowingStrategy.of(
+            SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
+        .withTrigger(AfterWatermark.pastEndOfWindow())
+        .withAllowedLateness(Duration.millis(1000)),
+        Sum.ofIntegers(), VarIntCoder.of());
 
     tester.injectElements(
         // assigned to [-60, 40), [-30, 70), [0, 100)
@@ -1081,7 +1098,11 @@ public class ReduceFnRunnerTest {
         // assigned to [-30, 70), [0, 100), [30, 130)
         TimestampedValue.of(12, new Instant(40)));
 
-    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(0, droppedElements);
 
     tester.advanceInputWatermark(new Instant(70));
     tester.injectElements(
@@ -1089,18 +1110,27 @@ public class ReduceFnRunnerTest {
         // but [-30, 70) is closed by the trigger
         TimestampedValue.of(14, new Instant(60)));
 
-    assertEquals(1, tester.getElementsDroppedDueToClosedWindow());
+    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(1, droppedElements);
 
     tester.advanceInputWatermark(new Instant(130));
     // assigned to [-30, 70), [0, 100), [30, 130)
     // but they are all closed
     tester.injectElements(TimestampedValue.of(16, new Instant(40)));
 
-    assertEquals(4, tester.getElementsDroppedDueToClosedWindow());
+    droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(4, droppedElements);
   }
 
   @Test
   public void testIdempotentEmptyPanesDiscarding() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
@@ -1141,11 +1171,16 @@ public class ReduceFnRunnerTest {
     assertTrue(tester.isMarkedFinished(firstWindow));
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
 
-    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(0, droppedElements);
   }
 
   @Test
   public void testIdempotentEmptyPanesAccumulating() throws Exception {
+    MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
     // Test uninteresting (empty) panes don't increment the index or otherwise
     // modify PaneInfo.
     ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
@@ -1188,7 +1223,11 @@ public class ReduceFnRunnerTest {
     assertTrue(tester.isMarkedFinished(firstWindow));
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
 
-    assertEquals(0, tester.getElementsDroppedDueToClosedWindow());
+    long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+        MetricName.named(ReduceFnRunner.class,
+            ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+        .getCumulative().longValue();
+    assertEquals(0, droppedElements);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index dfb769f..eba0f67 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -51,10 +51,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -111,9 +109,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
    */
   private boolean autoAdvanceOutputWatermark = true;
 
-  private final InMemoryLongSumAggregator droppedDueToClosedWindow =
-      new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
-
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating
    * a {@link TriggerStateMachine} from its {@link Trigger}.
@@ -303,7 +298,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
         timerInternals,
         testOutputter,
         sideInputReader,
-        droppedDueToClosedWindow,
         reduceFn,
         options);
   }
@@ -402,10 +396,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     return timerInternals.currentOutputWatermarkTime();
   }
 
-  public long getElementsDroppedDueToClosedWindow() {
-    return droppedDueToClosedWindow.getSum();
-  }
-
   /**
    * How many panes do we have in the output?
    */
@@ -611,32 +601,4 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
       return window;
     }
   }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return Sum.ofLongs();
-    }
-
-    public long getSum() {
-      return sum;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index db2d252..e063bc3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,7 +43,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -284,9 +283,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
             context);
     executor.start(graph.getRootTransforms());
 
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        pipeline.getAggregatorSteps();
-    DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps);
+    DirectPipelineResult result = new DirectPipelineResult(executor, context);
     if (options.isBlockOnRun()) {
       try {
         result.waitUntilFinish();
@@ -353,16 +350,13 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   public static class DirectPipelineResult implements PipelineResult {
     private final PipelineExecutor executor;
     private final EvaluationContext evaluationContext;
-    private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
     private State state;
 
     private DirectPipelineResult(
         PipelineExecutor executor,
-        EvaluationContext evaluationContext,
-        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
+        EvaluationContext evaluationContext) {
       this.executor = executor;
       this.evaluationContext = evaluationContext;
-      this.aggregatorSteps = aggregatorSteps;
       // Only ever constructed after the executor has started.
       this.state = State.RUNNING;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index ce7b12a..f1e29c6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -178,7 +178,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
               timerInternals,
               new OutputWindowedValueToBundle<>(bundle),
               new UnsupportedSideInputReader("GroupAlsoByWindow"),
-              droppedDueToClosedWindow,
               reduceFn,
               evaluationContext.getPipelineOptions());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index d3d9078..62d7a9c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -299,8 +299,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       doFnRunner = DoFnRunners.lateDataDroppingRunner(
           (DoFnRunner) doFnRunner,
           stepContext,
-          windowingStrategy,
-          ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
+          windowingStrategy);
     } else if (keyCoder != null) {
       // It is a stateful DoFn
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 4b6ca98..4c9d8d3 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170501</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170501-pr2718</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 029c28a..c59e0e7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -261,7 +261,6 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                               timerInternals,
                               outputHolder,
                               new UnsupportedSideInputReader("GroupAlsoByWindow"),
-                              droppedDueToClosedWindow,
                               reduceFn,
                               runtimeContext.getPipelineOptions());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index ccc0fa3..0a00c45 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -107,7 +107,6 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde
             timerInternals,
             outputter,
             new UnsupportedSideInputReader("GroupAlsoByWindow"),
-            droppedDueToClosedWindow,
             reduceFn,
             runtimeContext.getPipelineOptions());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index 8804f55..eeb9b45 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PValue;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.values.PValue;
  * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
  * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
  */
+@Deprecated
 class AggregatorPipelineExtractor {
   private final Pipeline pipeline;
 
@@ -68,14 +68,6 @@ class AggregatorPipelineExtractor {
     }
 
     private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
-      if (transform != null) {
-        if (transform instanceof ParDo.SingleOutput) {
-          return AggregatorRetriever.getAggregators(((ParDo.SingleOutput<?, ?>) transform).getFn());
-        } else if (transform instanceof ParDo.MultiOutput) {
-          return AggregatorRetriever.getAggregators(
-              ((ParDo.MultiOutput<?, ?>) transform).getFn());
-        }
-      }
       return Collections.emptyList();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 4119c53..c957100 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -23,30 +23,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT},
  * to be combined across all bundles.
  *
- * <p>Aggregators are created by calling
- * {@link DoFn#createAggregator DoFn.createAggregator},
- * typically from the {@link DoFn} constructor. Elements can be added to the
- * {@code Aggregator} by calling {@link Aggregator#addValue}.
- *
- * <p>It is runner-dependent whether aggregators are accessible during pipeline execution or only
- * after jobs have completed.
- *
- * <p>Example:
- * <pre>{@code
- * class MyDoFn extends DoFn<String, String> {
- *   private Aggregator<Integer, Integer> myAggregator;
- *
- *   public MyDoFn() {
- *     myAggregator = createAggregator("myAggregator", new Sum.SumIntegerFn());
- *   }
- *
- *   {@literal @}ProcessElement
- *   public void processElement(ProcessContext c) {
- *     myAggregator.addValue(1);
- *   }
- * }
- * }</pre>
- *
  * @param <InputT> the type of input values
  * @param <OutputT> the type of output values
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
deleted file mode 100644
index b1d3ead..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
- */
-public final class AggregatorRetriever {
-  private AggregatorRetriever() {
-    // do not instantiate
-  }
-
-  /**
-   * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
-   */
-  public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
-    return fn.getAggregators();
-  }
-
-  /**
-   * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link
-   * DoFn}.
-   */
-  public static Map<String, DelegatingAggregator<?, ?>> getDelegatingAggregators(DoFn<?, ?> fn) {
-    return fn.aggregators;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index d3da251..9b99ca4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -17,25 +17,16 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
@@ -209,48 +200,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
      */
     public abstract <T> void outputWithTimestamp(
         TupleTag<T> tag, T output, Instant timestamp);
-
-    /**
-     * Creates an {@link Aggregator} in the {@link DoFn} context with the specified name and
-     * aggregation logic specified by {@link CombineFn}. This is to be overridden by a particular
-     * runner context with an implementation that delivers the values as appropriate.
-     *
-     * <p>The aggregators declared on the {@link DoFn} will be wired up to aggregators allocated via
-     * this method.
-     *
-     * @param name the name of the aggregator
-     * @param combiner the {@link CombineFn} to use in the aggregator
-     * @return an aggregator for the provided name and {@link CombineFn} in this context
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected abstract <AggInputT, AggOutputT>
-        Aggregator<AggInputT, AggOutputT> createAggregator(
-            String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
-    /**
-     * Sets up {@link Aggregator}s created by the {@link DoFn} so they are usable within this
-     * context.
-     *
-     * <p>This method should be called by runners before the {@link StartBundle @StartBundle}
-     * method.
-     */
-    @Experimental(Kind.AGGREGATOR)
-    protected final void setupDelegateAggregators() {
-      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
-        setupDelegateAggregator(aggregator);
-      }
-
-      aggregatorsAreFinal = true;
-    }
-
-    private <AggInputT, AggOutputT> void setupDelegateAggregator(
-        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
-      Aggregator<AggInputT, AggOutputT> delegate = createAggregator(
-          aggregator.getName(), aggregator.getCombineFn());
-
-      aggregator.setDelegate(delegate);
-    }
   }
 
   /**
@@ -347,17 +296,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
 
   /////////////////////////////////////////////////////////////////////////////
 
-  protected Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
-
-  Collection<Aggregator<?, ?>> getAggregators() {
-    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
-  }
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
   /**
    * Returns a {@link TypeDescriptor} capturing what is known statically
    * about the input type of this {@code DoFn} instance's most-derived
@@ -699,67 +637,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   public @interface UnboundedPerElement {}
 
   /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the {@link CombineFn}
-   * argument. The name provided must be unique across {@link Aggregator}s created within the {@link
-   * DoFn}. Aggregators can only be created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of this {@link DoFn}
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another aggregator in this
-   *     scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-      String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(
-        !aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-            + " An Aggregator with that name already exists within this scope.",
-        name);
-    checkState(
-        !aggregatorsAreFinal,
-        "Cannot create an aggregator during pipeline execution."
-            + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this {@link DoFn}
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
-      String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
    * Finalize the {@link DoFn} construction to prepare for processing.
    * This method should be called by runners before any processing methods.
    */
   @Deprecated
-  public final void prepareForProcessing() {
-    aggregatorsAreFinal = true;
-  }
+  public final void prepareForProcessing() {}
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 5446431..813975c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -220,7 +220,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       initializeState();
     }
     TestContext context = new TestContext();
-    context.setupDelegateAggregators();
     try {
       fnInvoker.invokeStartBundle(context);
     } catch (UserCodeException e) {
@@ -503,13 +502,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return resultElems;
   }
 
-  /**
-   * Returns the value of the provided {@link Aggregator}.
-   */
-  public <AggregateT> AggregateT getAggregatorValue(Aggregator<?, AggregateT> agg) {
-    return extractAggregatorValue(agg.getName(), agg.getCombineFn());
-  }
-
   private <AccumT, AggregateT> AggregateT extractAggregatorValue(
       String name, CombineFn<?, AccumT, AggregateT> combiner) {
     @SuppressWarnings("unchecked")
@@ -575,53 +567,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       throw new UnsupportedOperationException(
           "DoFnTester doesn't support output from bundle methods");
     }
-
-    @Override
-    protected <AggInT, AggOutT> Aggregator<AggInT, AggOutT> createAggregator(
-        final String name, final CombineFn<AggInT, ?, AggOutT> combiner) {
-      return aggregator(name, combiner);
-    }
-
-    private <AinT, AccT, AoutT> Aggregator<AinT, AoutT> aggregator(
-        final String name,
-        final CombineFn<AinT, AccT, AoutT> combiner) {
-
-      Aggregator<AinT, AoutT> aggregator = new Aggregator<AinT, AoutT>() {
-        @Override
-        public void addValue(AinT value) {
-          AccT accum = (AccT) accumulators.get(name);
-          AccT newAccum = combiner.addInput(accum, value);
-          accumulators.put(name, newAccum);
-        }
-
-        @Override
-        public String getName() {
-          return name;
-        }
-
-        @Override
-        public CombineFn<AinT, ?, AoutT> getCombineFn() {
-          return combiner;
-        }
-      };
-
-      // Aggregator instantiation is idempotent
-      if (accumulators.containsKey(name)) {
-        Class<?> currentAccumClass = accumulators.get(name).getClass();
-        Class<?> createAccumClass = combiner.createAccumulator().getClass();
-        checkState(
-            currentAccumClass.isAssignableFrom(createAccumClass),
-            "Aggregator %s already initialized with accumulator type %s "
-                + "but was re-initialized with accumulator type %s",
-            name,
-            currentAccumClass,
-            createAccumClass);
-
-      } else {
-        accumulators.put(name, combiner.createAccumulator());
-      }
-      return aggregator;
-    }
   }
 
   public DoFn<InputT, OutputT>.ProcessContext createProcessContext(
@@ -700,15 +645,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       getMutableOutput(tag)
           .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
     }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
-        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new IllegalStateException("Aggregators should not be created within ProcessContext. "
-          + "Instead, create an aggregator at DoFn construction time with"
-          + " createAggregator, and ensure they are set up by the time startBundle is"
-          + " called with setupDelegateAggregators.");
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index e6892c5..4cb1142 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -97,8 +97,7 @@ public class Latest {
   }
 
   /**
-   * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is
-   * particularly useful as an {@link Aggregator}.
+   * A {@link Combine.CombineFn} that computes the latest element from a set of inputs.
    *
    * @param <T> Type of input element.
    * @see Latest

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
deleted file mode 100644
index 0d18840..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Tests for {@link AggregatorPipelineExtractor}.
- */
-@RunWith(JUnit4.class)
-public class AggregatorPipelineExtractorTest {
-  @Mock
-  private Pipeline p;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithParDoSingleOutputExtractsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo");
-    AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
-    when(parDo.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
-    Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers());
-
-    TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
-    when(transformNode.getTransform()).thenReturn(parDo);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(aggregatorSteps.size(), 2);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithParDoMultiOutputExtractsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.MultiOutput parDo = mock(ParDo.MultiOutput.class, "parDo");
-    AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
-    when(parDo.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
-
-    TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
-    when(transformNode.getTransform()).thenReturn(parDo);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo");
-    @SuppressWarnings("rawtypes")
-    ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo");
-    AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
-    when(parDo.getFn()).thenReturn(fn);
-    when(otherParDo.getFn()).thenReturn(fn);
-
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
-    Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles());
-
-    TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
-    when(transformNode.getTransform()).thenReturn(parDo);
-    TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class);
-    when(otherTransformNode.getTransform()).thenReturn(otherParDo);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(
-        ImmutableSet.<PTransform<?, ?>>of(parDo, otherParDo), aggregatorSteps.get(aggregatorOne));
-    assertEquals(
-        ImmutableSet.<PTransform<?, ?>>of(parDo, otherParDo), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testGetAggregatorStepsWithDifferentStepsAddsSteps() {
-    @SuppressWarnings("rawtypes")
-    ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo");
-
-    AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
-    Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs());
-
-    when(parDo.getFn()).thenReturn(fn);
-
-    @SuppressWarnings("rawtypes")
-    ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo");
-
-    AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
-    Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles());
-
-    when(otherParDo.getFn()).thenReturn(otherFn);
-
-    TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
-    when(transformNode.getTransform()).thenReturn(parDo);
-    TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class);
-    when(otherTransformNode.getTransform()).thenReturn(otherParDo);
-
-    doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode)))
-        .when(p)
-        .traverseTopologically(Mockito.any(PipelineVisitor.class));
-
-    AggregatorPipelineExtractor extractor = new AggregatorPipelineExtractor(p);
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        extractor.getAggregatorSteps();
-
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne));
-    assertEquals(ImmutableSet.<PTransform<?, ?>>of(otherParDo), aggregatorSteps.get(aggregatorTwo));
-    assertEquals(2, aggregatorSteps.size());
-  }
-
-  private static class VisitNodesAnswer implements Answer<Object> {
-    private final List<TransformHierarchy.Node> nodes;
-
-    public VisitNodesAnswer(List<TransformHierarchy.Node> nodes) {
-      this.nodes = nodes;
-    }
-
-    @Override
-    public Object answer(InvocationOnMock invocation) throws Throwable {
-      PipelineVisitor visitor = (PipelineVisitor) invocation.getArguments()[0];
-      for (TransformHierarchy.Node node : nodes) {
-        visitor.visitPrimitiveTransform(node);
-      }
-      return null;
-    }
-  }
-
-  private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> {
-    public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
-        CombineFn<InputT, ?, OutT> combiner) {
-      return createAggregator(randomName(), combiner);
-    }
-
-    private String randomName() {
-      return UUID.randomUUID().toString();
-    }
-
-    @ProcessElement
-    public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception {
-      fail();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index 4e80154..4b68317 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -18,20 +18,13 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -45,97 +38,6 @@ public class DoFnTest implements Serializable {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
-  private class NoOpDoFn extends DoFn<Void, Void> {
-
-    /**
-     * @param c context
-     */
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-    }
-  }
-
-  @Test
-  public void testCreateAggregatorWithCombinerSucceeds() {
-    String name = "testAggregator";
-    Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
-    assertEquals(name, aggregator.getName());
-    assertEquals(combiner, aggregator.getCombineFn());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullNameThrowsException() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("name cannot be null");
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    doFn.createAggregator(null, Sum.ofLongs());
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullCombineFnThrowsException() {
-    CombineFn<Object, Object, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithNullSerializableFnThrowsException() {
-    SerializableFunction<Iterable<Object>, Object> combiner = null;
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("combiner cannot be null");
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    doFn.createAggregator("testAggregator", combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorWithSameNameThrowsException() {
-    String name = "testAggregator";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    doFn.createAggregator(name, combiner);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Cannot create");
-    thrown.expectMessage(name);
-    thrown.expectMessage("already exists");
-
-    doFn.createAggregator(name, combiner);
-  }
-
-  @Test
-  public void testCreateAggregatorsWithDifferentNamesSucceeds() {
-    String nameOne = "testAggregator";
-    String nameTwo = "aggregatorPrime";
-    CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
-    DoFn<Void, Void> doFn = new NoOpDoFn();
-
-    Aggregator<Double, Double> aggregatorOne =
-
-        doFn.createAggregator(nameOne, combiner);
-    Aggregator<Double, Double> aggregatorTwo =
-        doFn.createAggregator(nameTwo, combiner);
-
-    assertNotEquals(aggregatorOne, aggregatorTwo);
-  }
-
   @Test
   public void testDefaultPopulateDisplayDataImplementation() {
     DoFn<String, String> fn = new DoFn<String, String>() {
@@ -143,68 +45,4 @@ public class DoFnTest implements Serializable {
     DisplayData displayData = DisplayData.from(fn);
     assertThat(displayData.items(), empty());
   }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInStartBundleThrows() {
-    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
-      @StartBundle
-      public void startBundle(Context c) {
-        createAggregator("anyAggregate", Max.ofIntegers());
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInProcessElementThrows() {
-    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        createAggregator("anyAggregate", Max.ofIntegers());
-      }
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCreateAggregatorInFinishBundleThrows() {
-    TestPipeline p = createTestPipeline(new DoFn<String, String>() {
-      @FinishBundle
-      public void finishBundle(Context c) {
-        createAggregator("anyAggregate", Max.ofIntegers());
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) {}
-    });
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-
-    p.run();
-  }
-
-  /**
-   * Initialize a test pipeline with the specified {@link DoFn}.
-   */
-  private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
-    pipeline.apply(Create.of((InputT) null))
-     .apply(ParDo.of(fn));
-
-    return pipeline;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 3b6fbfb..5c5718c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -29,6 +28,8 @@ import static org.junit.Assert.assertTrue;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -275,36 +276,6 @@ public class DoFnTesterTest {
   }
 
   @Test
-  public void getAggregatorValuesShouldGetValueOfCounter() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    try (DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn)) {
-      tester.processBundle(1L, 2L, 4L, 8L);
-      assertThat(tester.getAggregatorValue(counterDoFn.agg), equalTo(15L));
-    }
-  }
-
-  @Test
-  public void getAggregatorValuesWithEmptyCounterShouldSucceed() throws Exception {
-    CounterDoFn counterDoFn = new CounterDoFn();
-    try (DoFnTester<Long, String> tester = DoFnTester.of(counterDoFn)) {
-      tester.processBundle();
-      // empty bundle
-      assertThat(tester.getAggregatorValue(counterDoFn.agg), equalTo(0L));
-    }
-  }
-
-  @Test
-  public void getAggregatorValuesInStartFinishBundleShouldGetValues() throws Exception {
-    CounterDoFn fn = new CounterDoFn();
-    try (DoFnTester<Long, String> tester = DoFnTester.of(fn)) {
-      tester.processBundle(1L, 2L, 3L, 4L);
-
-      assertThat(tester.getAggregatorValue(fn.startBundleCalls), equalTo(1L));
-      assertThat(tester.getAggregatorValue(fn.finishBundleCalls), equalTo(1L));
-    }
-  }
-
-  @Test
   public void peekValuesInWindow() throws Exception {
     try (DoFnTester<Long, String> tester = DoFnTester.of(new CounterDoFn())) {
       tester.startBundle();
@@ -407,11 +378,9 @@ public class DoFnTesterTest {
    * {@link DoFn.ProcessElement @ProcessElement}.
    */
   private static class CounterDoFn extends DoFn<Long, String> {
-    Aggregator<Long, Long> agg = createAggregator("ctr", Sum.ofLongs());
-    Aggregator<Long, Long> startBundleCalls =
-        createAggregator("startBundleCalls", Sum.ofLongs());
-    Aggregator<Long, Long> finishBundleCalls =
-        createAggregator("finishBundleCalls", Sum.ofLongs());
+    Counter agg = Metrics.counter(CounterDoFn.class, "ctr");
+    Counter startBundleCalls = Metrics.counter(CounterDoFn.class, "startBundleCalls");
+    Counter finishBundleCalls = Metrics.counter(CounterDoFn.class, "finishBundleCalls");
 
     private enum LifecycleState {
       UNINITIALIZED,
@@ -431,13 +400,13 @@ public class DoFnTesterTest {
     public void startBundle(Context c) {
       checkState(state == LifecycleState.SET_UP, "Wrong state: %s", state);
       state = LifecycleState.INSIDE_BUNDLE;
-      startBundleCalls.addValue(1L);
+      startBundleCalls.inc();
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       checkState(state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", state);
-      agg.addValue(c.element());
+      agg.inc(c.element());
       Instant instant = new Instant(1000L * c.element());
       c.outputWithTimestamp(c.element().toString(), instant);
     }
@@ -446,7 +415,7 @@ public class DoFnTesterTest {
     public void finishBundle(Context c) {
       checkState(state == LifecycleState.INSIDE_BUNDLE, "Wrong state: %s", state);
       state = LifecycleState.SET_UP;
-      finishBundleCalls.addValue(1L);
+      finishBundleCalls.inc();
     }
 
     @Teardown

http://git-wip-us.apache.org/repos/asf/beam/blob/4253a60f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
index f49c765..b782b6e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
@@ -25,8 +25,6 @@ import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Objects;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -180,19 +178,6 @@ public class LatestFnTest {
   }
 
   @Test
-  public void testAggregator() throws Exception {
-    LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
-    DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
-    for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) {
-      harness.processTimestampedElement(element);
-    }
-
-    assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
-    assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
-    assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
-  }
-
-  @Test
   public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
     Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
 
@@ -205,29 +190,4 @@ public class LatestFnTest {
     assertThat("Default accumulator coder should handle null values",
         fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
   }
-
-  static class LatestAggregatorsFn<T> extends DoFn<T, T> {
-    private final T specialValue;
-    LatestAggregatorsFn(T specialValue) {
-      this.specialValue = specialValue;
-    }
-
-    Aggregator<TimestampedValue<T>, T> allValuesAgg =
-        createAggregator("allValues", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> specialValueAgg =
-        createAggregator("oneValue", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> noValuesAgg =
-        createAggregator("noValues", new Latest.LatestFn<T>());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
-      allValuesAgg.addValue(val);
-      if (Objects.equals(c.element(), specialValue)) {
-        specialValueAgg.addValue(val);
-      }
-    }
-  }
 }


Mime
View raw message