beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] incubator-beam git commit: Add input type to TransformResult
Date Mon, 28 Nov 2016 18:06:46 GMT
Add input type to TransformResult

This would likely have caught some hard-to-diagnose type safety errors
during the development of StatefulParDoEvaluatorFactory, so adding it
should hopefully catch similar bugs in the future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7502adda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7502adda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7502adda

Branch: refs/heads/master
Commit: 7502adda3262bce9d6d4fe4499bde8d8b5273029
Parents: 9fbd2d2
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Nov 22 16:01:45 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Nov 28 10:06:31 2016 -0800

----------------------------------------------------------------------
 .../direct/AbstractModelEnforcement.java        |  2 +-
 .../direct/BoundedReadEvaluatorFactory.java     |  2 +-
 .../beam/runners/direct/CommittedResult.java    |  2 +-
 .../beam/runners/direct/CompletionCallback.java |  2 +-
 ...ecycleManagerRemovingTransformEvaluator.java |  2 +-
 .../runners/direct/EmptyTransformEvaluator.java |  4 +-
 .../beam/runners/direct/EvaluationContext.java  |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java |  2 +-
 .../runners/direct/FlattenEvaluatorFactory.java | 10 ++---
 .../GroupAlsoByWindowEvaluatorFactory.java      |  5 ++-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  2 +-
 .../direct/ImmutabilityEnforcementFactory.java  |  2 +-
 .../beam/runners/direct/ModelEnforcement.java   |  2 +-
 .../beam/runners/direct/ParDoEvaluator.java     |  2 +-
 .../direct/PassthroughTransformEvaluator.java   |  4 +-
 .../runners/direct/StepTransformResult.java     | 38 +++++++++--------
 .../direct/TestStreamEvaluatorFactory.java      |  2 +-
 .../beam/runners/direct/TransformEvaluator.java |  2 +-
 .../beam/runners/direct/TransformExecutor.java  |  4 +-
 .../beam/runners/direct/TransformResult.java    | 16 +++++--
 .../direct/UnboundedReadEvaluatorFactory.java   |  3 +-
 .../runners/direct/ViewEvaluatorFactory.java    |  2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++---
 ...leManagerRemovingTransformEvaluatorTest.java |  4 +-
 .../runners/direct/EvaluationContextTest.java   | 20 ++++-----
 .../direct/FlattenEvaluatorFactoryTest.java     |  6 +--
 .../ImmutabilityEnforcementFactoryTest.java     |  6 +--
 .../beam/runners/direct/ParDoEvaluatorTest.java |  2 +-
 .../runners/direct/StepTransformResultTest.java | 25 ++++++-----
 .../direct/TestStreamEvaluatorFactoryTest.java  | 10 ++---
 .../runners/direct/TransformExecutorTest.java   | 45 ++++++++++----------
 .../UnboundedReadEvaluatorFactoryTest.java      | 20 ++++++---
 .../direct/WindowEvaluatorFactoryTest.java      | 12 +++---
 34 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 81f0f5f..f09164b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
   @Override
   public void afterFinish(
       CommittedBundle<T> input,
-      TransformResult result,
+      TransformResult<T> result,
       Iterable<? extends CommittedBundle<?>> outputs) {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 66c55cd..65b622f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -161,7 +161,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle()  {
+    public TransformResult<BoundedSourceShard<OutputT>> finishBundle()  {
       return resultBuilder.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 5fcf7b3..4db7e18 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -59,7 +59,7 @@ abstract class CommittedResult {
   public abstract Set<OutputType> getProducedOutputTypes();
 
   public static CommittedResult create(
-      TransformResult original,
+      TransformResult<?> original,
       CommittedBundle<?> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs,
       Set<OutputType> producedOutputs) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 2986df1..766259d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -28,7 +28,7 @@ interface CompletionCallback {
    * Handle a successful result, returning the committed outputs of the result.
    */
   CommittedResult handleResult(
-      CommittedBundle<?> inputBundle, TransformResult result);
+      CommittedBundle<?> inputBundle, TransformResult<?> result);
 
   /**
    * Handle an input bundle that did not require processing.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
index faa0615..fb13b0f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluator.java
@@ -54,7 +54,7 @@ class DoFnLifecycleManagerRemovingTransformEvaluator<InputT> implements Transfor
   }
 
   @Override
-  public TransformResult finishBundle() throws Exception {
+  public TransformResult<InputT> finishBundle() throws Exception {
     try {
       return underlying.finishBundle();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
index 778c5aa..85e5e70 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
@@ -43,8 +43,8 @@ final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
   public void processElement(WindowedValue<T> element) throws Exception {}
 
   @Override
-  public TransformResult finishBundle() throws Exception {
-    return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
+  public TransformResult<T> finishBundle() throws Exception {
+    return StepTransformResult.<T>withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index b814def..c1225f6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -161,7 +161,7 @@ class EvaluationContext {
   public CommittedResult handleResult(
       @Nullable CommittedBundle<?> completedBundle,
       Iterable<TimerData> completedTimers,
-      TransformResult result) {
+      TransformResult<?> result) {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
     metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 05cdd34..b7908c5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -270,7 +270,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
     @Override
     public final CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, TransformResult result) {
+        CommittedBundle<?> inputBundle, TransformResult<?> result) {
       CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 57d5628..817e736 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -56,17 +56,17 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
           application) {
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(application.getOutput());
-    final TransformResult result =
-        StepTransformResult.withoutHold(application).addOutput(outputBundle).build();
+    final TransformResult<InputT> result =
+        StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
     return new FlattenEvaluator<>(outputBundle, result);
   }
 
   private static class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> {
     private final UncommittedBundle<InputT> outputBundle;
-    private final TransformResult result;
+    private final TransformResult<InputT> result;
 
     public FlattenEvaluator(
-        UncommittedBundle<InputT> outputBundle, TransformResult result) {
+        UncommittedBundle<InputT> outputBundle, TransformResult<InputT> result) {
       this.outputBundle = outputBundle;
       this.result = result;
     }
@@ -77,7 +77,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() {
+    public TransformResult<InputT> finishBundle() {
       return result;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 36c742b..9d25bc6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -208,10 +208,11 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() throws Exception {
+    public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
       // State is initialized within the constructor. It can never be null.
       CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
-      return StepTransformResult.withHold(application, state.getEarliestWatermarkHold())
+      return StepTransformResult.<KeyedWorkItem<K, V>>withHold(
+              application, state.getEarliestWatermarkHold())
           .withState(state)
           .addOutput(outputBundles)
           .withTimerUpdate(stepContext.getTimerUpdate())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 0fa7ebd..4d691ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -143,7 +143,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() {
+    public TransformResult<KV<K, V>> finishBundle() {
       Builder resultBuilder = StepTransformResult.withoutHold(application);
       for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
           groupingMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index 612922a..85fc374 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -74,7 +74,7 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
     @Override
     public void afterFinish(
         CommittedBundle<T> input,
-        TransformResult result,
+        TransformResult<T> result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       for (MutationDetector detector : mutationElements.values()) {
         verifyUnmodified(detector);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index 074619a..25226f7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -58,6 +58,6 @@ public interface ModelEnforcement<T> {
    */
   void afterFinish(
       CommittedBundle<T> input,
-      TransformResult result,
+      TransformResult<T> result,
       Iterable<? extends CommittedBundle<?>> outputs);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 6f91319..254fa44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -122,7 +122,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
   }
 
   @Override
-  public TransformResult finishBundle() {
+  public TransformResult<InputT> finishBundle() {
     try {
       fnRunner.finishBundle();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index c6e10e5..153af65 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -42,8 +42,8 @@ class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT
   }
 
   @Override
-  public TransformResult finishBundle() throws Exception {
-    return StepTransformResult.withoutHold(transform).addOutput(output).build();
+  public TransformResult<InputT> finishBundle() throws Exception {
+    return StepTransformResult.<InputT>withoutHold(transform).addOutput(output).build();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 5719e44..d58b027 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -37,18 +37,20 @@ import org.joda.time.Instant;
  * An immutable {@link TransformResult}.
  */
 @AutoValue
-public abstract class StepTransformResult implements TransformResult {
+public abstract class StepTransformResult<InputT> implements TransformResult<InputT> {
 
-  public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
+  public static <InputT> Builder<InputT> withHold(
+      AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
     return new Builder(transform, watermarkHold);
   }
 
-  public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
+  public static <InputT> Builder<InputT> withoutHold(
+      AppliedPTransform<?, ?, ?> transform) {
     return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
   @Override
-  public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) {
+  public TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates) {
     return new AutoValue_StepTransformResult(
         getTransform(),
         getOutputBundles(),
@@ -64,10 +66,10 @@ public abstract class StepTransformResult implements TransformResult {
   /**
    * A builder for creating instances of {@link StepTransformResult}.
    */
-  public static class Builder {
+  public static class Builder<InputT> {
     private final AppliedPTransform<?, ?, ?> transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
-    private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder;
+    private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElementsBuilder;
     private MetricUpdates metricUpdates;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
@@ -85,8 +87,8 @@ public abstract class StepTransformResult implements TransformResult {
       this.metricUpdates = MetricUpdates.EMPTY;
     }
 
-    public StepTransformResult build() {
-      return new AutoValue_StepTransformResult(
+    public StepTransformResult<InputT> build() {
+      return new AutoValue_StepTransformResult<>(
           transform,
           bundlesBuilder.build(),
           unprocessedElementsBuilder.build(),
@@ -98,49 +100,51 @@ public abstract class StepTransformResult implements TransformResult {
           producedOutputs);
     }
 
-    public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
+    public Builder<InputT> withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
       this.aggregatorChanges = aggregatorChanges;
       return this;
     }
 
-    public Builder withMetricUpdates(MetricUpdates metricUpdates) {
+    public Builder<InputT> withMetricUpdates(MetricUpdates metricUpdates) {
       this.metricUpdates = metricUpdates;
       return this;
     }
 
-    public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
+    public Builder<InputT> withState(CopyOnAccessInMemoryStateInternals<?> state) {
       this.state = state;
       return this;
     }
 
-    public Builder withTimerUpdate(TimerUpdate timerUpdate) {
+    public Builder<InputT> withTimerUpdate(TimerUpdate timerUpdate) {
       this.timerUpdate = timerUpdate;
       return this;
     }
 
-    public Builder addUnprocessedElements(WindowedValue<?>... unprocessed) {
+    public Builder<InputT> addUnprocessedElements(WindowedValue<InputT>... unprocessed) {
       unprocessedElementsBuilder.addAll(Arrays.asList(unprocessed));
       return this;
     }
 
-    public Builder addUnprocessedElements(Iterable<? extends WindowedValue<?>> unprocessed) {
+    public Builder<InputT> addUnprocessedElements(
+        Iterable<? extends WindowedValue<InputT>> unprocessed) {
       unprocessedElementsBuilder.addAll(unprocessed);
       return this;
     }
 
-    public Builder addOutput(
+    public Builder<InputT> addOutput(
         UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
       bundlesBuilder.add(outputBundle);
       bundlesBuilder.add(outputBundles);
       return this;
     }
 
-    public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
+    public Builder<InputT> addOutput(
+        Collection<UncommittedBundle<?>> outputBundles) {
       bundlesBuilder.addAll(outputBundles);
       return this;
     }
 
-    public Builder withAdditionalOutput(OutputType producedAdditionalOutput) {
+    public Builder<InputT> withAdditionalOutput(OutputType producedAdditionalOutput) {
       producedOutputs.add(producedAdditionalOutput);
       return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 2ab6adf..9df7cdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -127,7 +127,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() throws Exception {
+    public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
       return resultBuilder.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
index 1624fcb..79c942b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java
@@ -42,5 +42,5 @@ public interface TransformEvaluator<InputT> {
    *
    * @return an {@link TransformResult} containing the results of this bundle evaluation.
    */
-  TransformResult finishBundle() throws Exception;
+  TransformResult<InputT> finishBundle() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index fb31cc9..bbc0aae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -159,11 +159,11 @@ class TransformExecutor<T> implements Runnable {
    * @return the {@link TransformResult} produced by
    *         {@link TransformEvaluator#finishBundle()}
    */
-  private TransformResult finishBundle(
+  private TransformResult<T> finishBundle(
       TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
       Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    TransformResult result = evaluator.finishBundle()
+    TransformResult<T> result = evaluator.finishBundle()
         .withLogicalMetricUpdates(metricsContainer.getCumulative());
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index ac1e395..b4797b0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -32,16 +33,25 @@ import org.joda.time.Instant;
 
 /**
  * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
+ *
+ * <p>Every transform evaluator has a defined input type, but {@link ParDo} has multiple outputs
+ * so there is not necesssarily a defined output type.
  */
-public interface TransformResult {
+public interface TransformResult<InputT> {
   /**
    * Returns the {@link AppliedPTransform} that produced this result.
+   *
+   * <p>This is treated as an opaque identifier so evaluators can delegate to other evaluators
+   * that may not have compatible types.
    */
   AppliedPTransform<?, ?, ?> getTransform();
 
   /**
    * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
    * will be committed by the evaluation context as part of completing this result.
+   *
+   * <p>Note that the bundles need not have a uniform type, for example in the case of multi-output
+   * {@link ParDo}.
    */
   Iterable<? extends UncommittedBundle<?>> getOutputBundles();
 
@@ -49,7 +59,7 @@ public interface TransformResult {
    * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
    * processed.
    */
-  Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+  Iterable<? extends WindowedValue<InputT>> getUnprocessedElements();
 
   /**
    * Returns the {@link AggregatorContainer.Mutator} used by this {@link PTransform}, or null if
@@ -97,5 +107,5 @@ public interface TransformResult {
    * Returns a new TransformResult based on this one but overwriting any existing logical metric
    * updates with {@code metricUpdates}.
    */
-  TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates);
+  TransformResult<InputT> withLogicalMetricUpdates(MetricUpdates metricUpdates);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 24a91cb..a4aebc9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -229,7 +229,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() throws IOException {
+    public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle()
+        throws IOException {
       return resultBuilder.build();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 2dd280a..b92ade1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -81,7 +81,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       }
 
       @Override
-      public TransformResult finishBundle() {
+      public TransformResult<Iterable<InT>> finishBundle() {
         writer.add(elements);
         Builder resultBuilder = StepTransformResult.withoutHold(application);
         if (!elements.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index eb53b7f..991addf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -103,8 +103,10 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public TransformResult finishBundle() throws Exception {
-      return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
+    public TransformResult<InputT> finishBundle() throws Exception {
+      return StepTransformResult.<InputT>withoutHold(transform)
+          .addOutput(outputBundle)
+          .build();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index e956c34..dee95a7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -110,7 +110,7 @@ public class BoundedReadEvaluatorFactoryTest {
       for (WindowedValue<?> shard : shardBundle.getElements()) {
         evaluator.processElement((WindowedValue) shard);
       }
-      TransformResult result = evaluator.finishBundle();
+      TransformResult<?> result = evaluator.finishBundle();
       assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
       assertThat(
           Iterables.size(result.getOutputBundles()),
@@ -154,11 +154,11 @@ public class BoundedReadEvaluatorFactoryTest {
 
       Collection<CommittedBundle<?>> newUnreadInputs = new ArrayList<>();
       for (CommittedBundle<?> shardBundle : unreadInputs) {
-        TransformEvaluator<?> evaluator = factory.forApplication(transform, null);
+        TransformEvaluator<Long> evaluator = factory.forApplication(transform, null);
         for (WindowedValue<?> shard : shardBundle.getElements()) {
           evaluator.processElement((WindowedValue) shard);
         }
-        TransformResult result = evaluator.finishBundle();
+        TransformResult<Long> result = evaluator.finishBundle();
         assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
         assertThat(
             Iterables.size(result.getOutputBundles()),
@@ -207,7 +207,7 @@ public class BoundedReadEvaluatorFactoryTest {
       for (WindowedValue<?> shard : shardBundle.getElements()) {
         evaluator.processElement((WindowedValue) shard);
       }
-      TransformResult result = evaluator.finishBundle();
+      TransformResult<?> result = evaluator.finishBundle();
       assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
       assertThat(
           Iterables.size(result.getOutputBundles()),
@@ -277,7 +277,7 @@ public class BoundedReadEvaluatorFactoryTest {
       when(context.createBundle(longs)).thenReturn(outputBundle);
       evaluator.processElement(shard);
     }
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<?> result = evaluator.finishBundle();
     assertThat(Iterables.size(result.getOutputBundles()), equalTo(splits.size()));
 
     List<WindowedValue<?>> outputElems = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 9e2732e..b5eec63 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -115,7 +115,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
     }
 
     @Override
-    public TransformResult finishBundle() throws Exception {
+    public TransformResult<Object> finishBundle() throws Exception {
       finishBundleCalled = true;
       return null;
     }
@@ -128,7 +128,7 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
     }
 
     @Override
-    public TransformResult finishBundle() throws Exception {
+    public TransformResult<Object> finishBundle() throws Exception {
       throw new Exception();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index e1277ac..9a3959d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -250,7 +250,7 @@ public class EvaluationContextTest {
     AggregatorContainer.Mutator mutator = container.createMutator();
     mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L);
 
-    TransformResult result =
+    TransformResult<?> result =
         StepTransformResult.withoutHold(created.getProducingTransformInternal())
             .withAggregatorChanges(mutator)
             .build();
@@ -260,7 +260,7 @@ public class EvaluationContextTest {
     AggregatorContainer.Mutator mutatorAgain = container.createMutator();
     mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L);
 
-    TransformResult secondResult =
+    TransformResult<?> secondResult =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
             .withAggregatorChanges(mutatorAgain)
             .build();
@@ -286,7 +286,7 @@ public class EvaluationContextTest {
     bag.add(2);
     bag.add(4);
 
-    TransformResult stateResult =
+    TransformResult<?> stateResult =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
             .withState(state)
             .build();
@@ -319,7 +319,7 @@ public class EvaluationContextTest {
     context.scheduleAfterOutputWouldBeProduced(
         downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
 
-    TransformResult result =
+    TransformResult<?> result =
         StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
             .build();
 
@@ -328,7 +328,7 @@ public class EvaluationContextTest {
     // will likely be flaky if this logic is broken
     assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
 
-    TransformResult finishedResult =
+    TransformResult<?> finishedResult =
         StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
     context.forceRefresh();
@@ -338,7 +338,7 @@ public class EvaluationContextTest {
 
   @Test
   public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
-    TransformResult finishedResult =
+    TransformResult<?> finishedResult =
         StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
 
@@ -358,7 +358,7 @@ public class EvaluationContextTest {
 
   @Test
   public void extractFiredTimersExtractsTimers() {
-    TransformResult holdResult =
+    TransformResult<?> holdResult =
         StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
@@ -366,7 +366,7 @@ public class EvaluationContextTest {
     StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
     TimerData toFire =
         TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
-    TransformResult timerResult =
+    TransformResult<?> timerResult =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
             .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
             .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
@@ -382,7 +382,7 @@ public class EvaluationContextTest {
     // timer hasn't fired
     assertThat(context.extractFiredTimers(), emptyIterable());
 
-    TransformResult advanceResult =
+    TransformResult<?> advanceResult =
         StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
@@ -460,7 +460,7 @@ public class EvaluationContextTest {
         context.handleResult(
             null,
             ImmutableList.<TimerData>of(),
-            StepTransformResult.withoutHold(created.getProducingTransformInternal())
+            StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal())
                 .addOutput(rootBundle)
                 .build());
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 417aa64..cb27fbc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -84,8 +84,8 @@ public class FlattenEvaluatorFactoryTest {
     rightSideEvaluator.processElement(
         WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));
 
-    TransformResult rightSideResult = rightSideEvaluator.finishBundle();
-    TransformResult leftSideResult = leftSideEvaluator.finishBundle();
+    TransformResult<Integer> rightSideResult = rightSideEvaluator.finishBundle();
+    TransformResult<Integer> leftSideResult = leftSideEvaluator.finishBundle();
 
     assertThat(
         rightSideResult.getOutputBundles(),
@@ -131,7 +131,7 @@ public class FlattenEvaluatorFactoryTest {
             flattened.getProducingTransformInternal(),
             bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
 
-    TransformResult leftSideResult = emptyEvaluator.finishBundle();
+    TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
 
     CommittedBundle<?> outputBundle =
         Iterables.getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index a7277fe..a65cd30 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -78,7 +78,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
     enforcement.afterElement(element);
     enforcement.afterFinish(
         elements,
-        StepTransformResult.withoutHold(consumer).build(),
+        StepTransformResult.<byte[]>withoutHold(consumer).build(),
         Collections.<CommittedBundle<?>>emptyList());
   }
 
@@ -98,7 +98,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
     enforcement.afterElement(element);
     enforcement.afterFinish(
         elements,
-        StepTransformResult.withoutHold(consumer).build(),
+        StepTransformResult.<byte[]>withoutHold(consumer).build(),
         Collections.<CommittedBundle<?>>emptyList());
   }
 
@@ -120,7 +120,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
     thrown.expectMessage("Input values must not be mutated");
     enforcement.afterFinish(
         elements,
-        StepTransformResult.withoutHold(consumer).build(),
+        StepTransformResult.<byte[]>withoutHold(consumer).build(),
         Collections.<CommittedBundle<?>>emptyList());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index eab92f4..85e99c5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -112,7 +112,7 @@ public class ParDoEvaluatorTest {
     evaluator.processElement(first);
     evaluator.processElement(second);
     evaluator.processElement(third);
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<Integer> result = evaluator.finishBundle();
 
     assertThat(
         result.getUnprocessedElements(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index 61f5812..a21d8f7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -56,34 +56,37 @@ public class StepTransformResultTest {
   @Test
   public void producedBundlesProducedOutputs() {
     UncommittedBundle<Integer> bundle = bundleFactory.createBundle(pc);
-    TransformResult result = StepTransformResult.withoutHold(transform).addOutput(bundle)
-        .build();
+    TransformResult<Integer> result =
+        StepTransformResult.<Integer>withoutHold(transform).addOutput(bundle).build();
 
-    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle>containsInAnyOrder(bundle));
+    assertThat(
+        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(bundle));
   }
 
   @Test
   public void withAdditionalOutputProducedOutputs() {
-    TransformResult result = StepTransformResult.withoutHold(transform)
-        .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
-        .build();
+    TransformResult<Integer> result =
+        StepTransformResult.<Integer>withoutHold(transform)
+            .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
+            .build();
 
     assertThat(result.getOutputTypes(), containsInAnyOrder(OutputType.PCOLLECTION_VIEW));
   }
 
   @Test
   public void producedBundlesAndAdditionalOutputProducedOutputs() {
-    TransformResult result = StepTransformResult.withoutHold(transform)
-        .addOutput(bundleFactory.createBundle(pc))
-        .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
-        .build();
+    TransformResult<Integer> result =
+        StepTransformResult.<Integer>withoutHold(transform)
+            .addOutput(bundleFactory.createBundle(pc))
+            .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
+            .build();
 
     assertThat(result.getOutputTypes(), hasItem(OutputType.PCOLLECTION_VIEW));
   }
 
   @Test
   public void noBundlesNoAdditionalOutputProducedOutputsFalse() {
-    TransformResult result = StepTransformResult.withoutHold(transform).build();
+    TransformResult<Integer> result = StepTransformResult.<Integer>withoutHold(transform).build();
 
     assertThat(result.getOutputTypes(), emptyIterable());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 94a0d41..3d31df6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -90,7 +90,7 @@ public class TestStreamEvaluatorFactoryTest {
     TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle);
     firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements()));
-    TransformResult firstResult = firstEvaluator.finishBundle();
+    TransformResult<TestStreamIndex<Integer>> firstResult = firstEvaluator.finishBundle();
 
     WindowedValue<TestStreamIndex<Integer>> firstResidual =
         (WindowedValue<TestStreamIndex<Integer>>)
@@ -103,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest {
     TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle);
     secondEvaluator.processElement(firstResidual);
-    TransformResult secondResult = secondEvaluator.finishBundle();
+    TransformResult<TestStreamIndex<Integer>> secondResult = secondEvaluator.finishBundle();
 
     WindowedValue<TestStreamIndex<Integer>> secondResidual =
         (WindowedValue<TestStreamIndex<Integer>>)
@@ -116,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest {
     TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle);
     thirdEvaluator.processElement(secondResidual);
-    TransformResult thirdResult = thirdEvaluator.finishBundle();
+    TransformResult<TestStreamIndex<Integer>> thirdResult = thirdEvaluator.finishBundle();
 
     WindowedValue<TestStreamIndex<Integer>> thirdResidual =
         (WindowedValue<TestStreamIndex<Integer>>)
@@ -130,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest {
     TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle);
     fourthEvaluator.processElement(thirdResidual);
-    TransformResult fourthResult = fourthEvaluator.finishBundle();
+    TransformResult<TestStreamIndex<Integer>> fourthResult = fourthEvaluator.finishBundle();
 
     assertThat(clock.now(), equalTo(start.plus(Duration.standardMinutes(10))));
     WindowedValue<TestStreamIndex<Integer>> fourthResidual =
@@ -144,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest {
     TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator =
         factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle);
     fifthEvaluator.processElement(fourthResidual);
-    TransformResult fifthResult = fifthEvaluator.finishBundle();
+    TransformResult<TestStreamIndex<Integer>> fifthResult = fifthEvaluator.finishBundle();
 
     assertThat(
         Iterables.getOnlyElement(firstResult.getOutputBundles())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 0b7b882..85eff65 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -96,7 +95,7 @@ public class TransformExecutorTest {
 
   @Test
   public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
-    final TransformResult result =
+    final TransformResult<Object> result =
         StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     final AtomicBoolean finishCalled = new AtomicBoolean(false);
     TransformEvaluator<Object> evaluator =
@@ -107,7 +106,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<Object> finishBundle() throws Exception {
             finishCalled.set(true);
             return result;
           }
@@ -128,7 +127,7 @@ public class TransformExecutorTest {
     executor.run();
 
     assertThat(finishCalled.get(), is(true));
-    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result));
     assertThat(completionCallback.handledException, is(nullValue()));
   }
 
@@ -154,8 +153,8 @@ public class TransformExecutorTest {
 
   @Test
   public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
-    final TransformResult result =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final TransformResult<String> result =
+        StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
     final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
     TransformEvaluator<String> evaluator =
         new TransformEvaluator<String>() {
@@ -166,7 +165,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<String> finishBundle() throws Exception {
             return result;
           }
         };
@@ -194,14 +193,14 @@ public class TransformExecutorTest {
     evaluatorCompleted.await();
 
     assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
-    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledResult, Matchers.<TransformResult<?>>equalTo(result));
     assertThat(completionCallback.handledException, is(nullValue()));
   }
 
   @Test
   public void processElementThrowsExceptionCallsback() throws Exception {
-    final TransformResult result =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final TransformResult<String> result =
+        StepTransformResult.<String>withoutHold(downstream.getProducingTransformInternal()).build();
     final Exception exception = new Exception();
     TransformEvaluator<String> evaluator =
         new TransformEvaluator<String>() {
@@ -211,7 +210,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<String> finishBundle() throws Exception {
             return result;
           }
         };
@@ -248,7 +247,7 @@ public class TransformExecutorTest {
           public void processElement(WindowedValue<String> element) throws Exception {}
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<String> finishBundle() throws Exception {
             throw exception;
           }
         };
@@ -277,7 +276,7 @@ public class TransformExecutorTest {
 
   @Test
   public void callWithEnforcementAppliesEnforcement() throws Exception {
-    final TransformResult result =
+    final TransformResult<Object> result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
 
     TransformEvaluator<Object> evaluator =
@@ -286,7 +285,7 @@ public class TransformExecutorTest {
           public void processElement(WindowedValue<Object> element) throws Exception {}
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<Object> finishBundle() throws Exception {
             return result;
           }
         };
@@ -317,7 +316,7 @@ public class TransformExecutorTest {
     assertThat(
         testEnforcement.afterElements,
         Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
-    assertThat(testEnforcement.finishedBundles, contains(result));
+    assertThat(testEnforcement.finishedBundles, Matchers.<TransformResult<?>>contains(result));
   }
 
   @Test
@@ -333,7 +332,7 @@ public class TransformExecutorTest {
               }
             });
 
-    final TransformResult result =
+    final TransformResult<Object> result =
         StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
     final CountDownLatch testLatch = new CountDownLatch(1);
     final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -344,7 +343,7 @@ public class TransformExecutorTest {
           public void processElement(WindowedValue<Object> element) throws Exception {}
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<Object> finishBundle() throws Exception {
             testLatch.countDown();
             evaluatorLatch.await();
             return result;
@@ -389,7 +388,7 @@ public class TransformExecutorTest {
               }
             });
 
-    final TransformResult result =
+    final TransformResult<Object> result =
         StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
     final CountDownLatch testLatch = new CountDownLatch(1);
     final CountDownLatch evaluatorLatch = new CountDownLatch(1);
@@ -403,7 +402,7 @@ public class TransformExecutorTest {
           }
 
           @Override
-          public TransformResult finishBundle() throws Exception {
+          public TransformResult<Object> finishBundle() throws Exception {
             return result;
           }
         };
@@ -434,7 +433,7 @@ public class TransformExecutorTest {
   }
 
   private static class RegisteringCompletionCallback implements CompletionCallback {
-    private TransformResult handledResult = null;
+    private TransformResult<?> handledResult = null;
     private boolean handledEmpty = false;
     private Exception handledException = null;
     private final CountDownLatch onMethod;
@@ -444,7 +443,7 @@ public class TransformExecutorTest {
     }
 
     @Override
-    public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult result) {
+    public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult<?> result) {
       handledResult = result;
       onMethod.countDown();
       @SuppressWarnings("rawtypes")
@@ -490,7 +489,7 @@ public class TransformExecutorTest {
   private static class TestEnforcement<T> implements ModelEnforcement<T> {
     private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
     private final List<WindowedValue<T>> afterElements = new ArrayList<>();
-    private final List<TransformResult> finishedBundles = new ArrayList<>();
+    private final List<TransformResult<?>> finishedBundles = new ArrayList<>();
 
     @Override
     public void beforeElement(WindowedValue<T> element) {
@@ -505,7 +504,7 @@ public class TransformExecutorTest {
     @Override
     public void afterFinish(
         CommittedBundle<T> input,
-        TransformResult result,
+        TransformResult<T> result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       finishedBundles.add(result);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 8d38275..5a10134 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -159,9 +159,10 @@ public class UnboundedReadEvaluatorFactoryTest {
             longs.getProducingTransformInternal(), inputShards);
 
     evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements()));
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle();
 
-    WindowedValue<?> residual = Iterables.getOnlyElement(result.getUnprocessedElements());
+    WindowedValue<? super UnboundedSourceShard<Long, ?>> residual =
+        Iterables.getOnlyElement(result.getUnprocessedElements());
     assertThat(
         residual.getTimestamp(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
     UnboundedSourceShard<Long, ?> residualShard =
@@ -206,7 +207,8 @@ public class UnboundedReadEvaluatorFactoryTest {
       evaluator.processElement(
           (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
     }
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+        evaluator.finishBundle();
     assertThat(
         output.commit(Instant.now()).getElements(),
         containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
@@ -248,7 +250,8 @@ public class UnboundedReadEvaluatorFactoryTest {
       evaluator.processElement(
           (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>) value);
     }
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+        evaluator.finishBundle();
 
     // Read from the residual of the first read. This should not produce any output, but should
     // include a residual shard in the result.
@@ -261,7 +264,8 @@ public class UnboundedReadEvaluatorFactoryTest {
             Iterables.getOnlyElement(result.getUnprocessedElements());
     secondEvaluator.processElement(residual);
 
-    TransformResult secondResult = secondEvaluator.finishBundle();
+    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> secondResult =
+        secondEvaluator.finishBundle();
 
     // Sanity check that nothing was output (The test would have to run for more than a day to do
     // so correctly.)
@@ -308,7 +312,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+        evaluator.finishBundle();
 
     CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
         inputBundle.withElements(
@@ -350,7 +355,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<UnboundedSourceShard<Long, TestCheckpointMark>> result =
+        evaluator.finishBundle();
 
     CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> residual =
         inputBundle.withElements(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 741f8f2..e2f987c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -118,7 +118,7 @@ public class WindowEvaluatorFactoryTest {
 
     UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
 
-    TransformResult result = runEvaluator(triggering, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(triggering, inputBundle, transform);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -143,7 +143,7 @@ public class WindowEvaluatorFactoryTest {
     BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
     BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
 
-    TransformResult result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -178,7 +178,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -235,7 +235,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -301,7 +301,7 @@ public class WindowEvaluatorFactoryTest {
     return outputBundle;
   }
 
-  private TransformResult runEvaluator(
+  private TransformResult<Long> runEvaluator(
       PCollection<Long> windowed,
       CommittedBundle<Long> inputBundle,
       Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */)
@@ -313,7 +313,7 @@ public class WindowEvaluatorFactoryTest {
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);
     evaluator.processElement(valueInIntervalWindow);
-    TransformResult result = evaluator.finishBundle();
+    TransformResult<Long> result = evaluator.finishBundle();
     return result;
   }
 


Mime
View raw message