beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [3/4] incubator-beam git commit: Use the State of the Executor to drive progress
Date Thu, 04 Aug 2016 00:46:03 GMT
Use the State of the Executor to drive progress

Add the concept of Quiescence to ExecutorServiceParallelExecutor.

If the executor is Quiescent, it should interrogate root nodes for
additional work. If not, runs of the monitor should update the state as
appropriate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8e7963f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8e7963f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8e7963f4

Branch: refs/heads/master
Commit: 8e7963f498955f10db1867e106ee82068b5f0c2a
Parents: f7cc7e1
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Jul 22 13:47:43 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Aug 3 17:45:12 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    |  11 +-
 .../beam/runners/direct/CompletionCallback.java |   5 +-
 .../beam/runners/direct/EvaluationContext.java  |  10 +-
 .../direct/ExecutorServiceParallelExecutor.java | 108 ++++++++++++++++---
 .../runners/direct/StepTransformResult.java     |  26 ++---
 .../beam/runners/direct/TransformExecutor.java  |   2 +-
 .../beam/runners/direct/TransformResult.java    |  12 ++-
 .../runners/direct/ViewEvaluatorFactory.java    |   9 +-
 .../runners/direct/CommittedResultTest.java     |  10 +-
 .../runners/direct/EvaluationContextTest.java   |   3 +
 .../runners/direct/StepTransformResultTest.java |  26 +++--
 .../runners/direct/TransformExecutorTest.java   |  11 +-
 .../runners/direct/WatermarkManagerTest.java    |   6 +-
 13 files changed, 176 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index e9a40a8..7e0cd8e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 
 import com.google.auto.value.AutoValue;
 
+import java.util.Set;
+
 import javax.annotation.Nullable;
 
 /**
@@ -57,16 +59,21 @@ abstract class CommittedResult {
    * {@link CreatePCollectionView}) should explicitly set this to true. If {@link #getOutputs()}
    * returns a nonempty iterable, this will also return true.
    */
-  public abstract boolean producedOutputs();
+  public abstract Set<OutputType> getProducedOutputTypes();
 
   public static CommittedResult create(
       TransformResult original,
       CommittedBundle<?> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs,
-      boolean producedOutputs) {
+      Set<OutputType> producedOutputs) {
     return new AutoValue_CommittedResult(original.getTransform(),
         unprocessedElements,
         outputs,
         producedOutputs);
   }
+
+  enum OutputType {
+    PCOLLECTION_VIEW,
+    BUNDLE
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 2f496e9..8e51d6f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 
 /**
  * A callback for completing a bundle of input.
@@ -31,8 +32,10 @@ interface CompletionCallback {
 
   /**
    * Handle an input bundle that did not require processing.
+   *
+   * <p>This occurs when a Source has no splits that can currently produce outputs.
    */
-  void handleEmpty(CommittedBundle<?> inputBundle);
+  void handleEmpty(AppliedPTransform<?, ?, ?> transform);
 
   /**
    * Handle a result that terminated abnormally due to the provided {@link Throwable}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 610a62d..23c139d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
@@ -48,6 +49,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -155,12 +157,18 @@ class EvaluationContext {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
     // Update watermarks and timers
+    EnumSet<OutputType> outputTypes = EnumSet.copyOf(result.getOutputTypes());
+    if (Iterables.isEmpty(committedBundles)) {
+      outputTypes.remove(OutputType.BUNDLE);
+    } else {
+      outputTypes.add(OutputType.BUNDLE);
+    }
     CommittedResult committedResult = CommittedResult.create(result,
         completedBundle == null
             ? null
             : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
         committedBundles,
-        result.producedOutput());
+        outputTypes);
     watermarkManager.updateWatermarks(
         completedBundle,
         result.getTimerUpdate().withCompletedTimers(completedTimers),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 3901472..64836d8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 
 import com.google.auto.value.AutoValue;
@@ -54,6 +55,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
@@ -85,6 +88,18 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
 
   private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
 
+ private final AtomicReference<ExecutorState> state =
+      new AtomicReference<>(ExecutorState.QUIESCENT);
+
+  /**
+   * Measures the number of {@link TransformExecutor TransformExecutors} that have been scheduled
+   * but not yet completed.
+   *
+   * <p>Before a {@link TransformExecutor} is scheduled, this value is incremented.
All methods in
+   * {@link CompletionCallback} decrement this value.
+   */
+  private final AtomicLong outstandingWork = new AtomicLong();
+
   public static ExecutorServiceParallelExecutor create(
       ExecutorService executorService,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
@@ -184,6 +199,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
             transform,
             onComplete,
             transformExecutor);
+    outstandingWork.incrementAndGet();
     transformExecutor.schedule(callable);
   }
 
@@ -240,16 +256,22 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
         allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs,
             Collections.<AppliedPTransform<?, ?, ?>>singleton(committedResult.getTransform())));
       }
+      if (!committedResult.getProducedOutputTypes().isEmpty()) {
+        state.set(ExecutorState.ACTIVE);
+      }
+      outstandingWork.decrementAndGet();
       return committedResult;
     }
 
     @Override
-    public void handleEmpty(CommittedBundle<?> inputBundle) {
+    public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
+      outstandingWork.decrementAndGet();
     }
 
     @Override
     public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t)
{
       allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+      outstandingWork.decrementAndGet();
     }
   }
 
@@ -330,6 +352,20 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
       String oldName = Thread.currentThread().getName();
       Thread.currentThread().setName(runnableName);
       try {
+        boolean noWorkOutstanding = outstandingWork.get() == 0L;
+        ExecutorState startingState = state.get();
+        if (startingState == ExecutorState.ACTIVE) {
+          // The remainder of this call will add all available work to the Executor, and
there will
+          // be no new work available
+          state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
+        } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding)
{
+          // The executor has consumed all new work and no new work was added
+          state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
+        } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding)
{
+          // The executor re-ran all blocked work and nothing could make progress.
+          state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
+        }
+        fireTimers();
         Collection<ExecutorUpdate> updates = new ArrayList<>();
         // Pull all available updates off of the queue before adding additional work. This
ensures
         // both loops terminate.
@@ -341,14 +377,18 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
         for (ExecutorUpdate update : updates) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
-            scheduleConsumers(update);
+            if (ExecutorState.ACTIVE == startingState || (ExecutorState.PROCESSING == startingState
+                && noWorkOutstanding)) {
+              scheduleConsumers(update);
+            } else {
+              allUpdates.offer(update);
+            }
           } else if (update.getException().isPresent()) {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
             exceptionThrown = true;
           }
         }
-        boolean timersFired = fireTimers();
-        addWorkIfNecessary(timersFired);
+        addWorkIfNecessary();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOG.error("Monitor died due to being interrupted");
@@ -372,9 +412,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
     /**
      * Fires any available timers. Returns true if at least one timer was fired.
      */
-    private boolean fireTimers() throws Exception {
+    private void fireTimers() throws Exception {
       try {
-        boolean firedTimers = false;
         for (Map.Entry<
                AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>>
transformTimers :
             evaluationContext.extractFiredTimers().entrySet()) {
@@ -395,12 +434,11 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
                           null, keyTimers.getKey(), (PCollection) transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
                       .commit(Instant.now());
+              state.set(ExecutorState.ACTIVE);
               scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
-              firedTimers = true;
             }
           }
         }
-        return firedTimers;
       } catch (Exception e) {
         LOG.error("Internal Error while delivering timers", e);
         throw e;
@@ -426,17 +464,55 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
      * add more work from root nodes that may have additional work. This ensures that if
a pipeline
      * has elements available from the root nodes it will add those elements when necessary.
      */
-    private void addWorkIfNecessary(boolean firedTimers) {
+    private void addWorkIfNecessary() {
       // If any timers have fired, they will add more work; We don't need to add more
-      if (firedTimers) {
-        return;
-      }
-      // All current TransformExecutors are blocked; add more work from the roots.
-      for (AppliedPTransform<?, ?, ?> root : rootNodes) {
-        if (!evaluationContext.isDone(root)) {
-          scheduleConsumption(root, null, defaultCompletionCallback);
+      if (state.get() == ExecutorState.QUIESCENT) {
+        // All current TransformExecutors are blocked; add more work from the roots.
+        for (AppliedPTransform<?, ?, ?> root : rootNodes) {
+          if (!evaluationContext.isDone(root)) {
+            scheduleConsumption(root, null, defaultCompletionCallback);
+            state.set(ExecutorState.ACTIVE);
+          }
         }
       }
     }
   }
+
+
+  /**
+   * The state of the executor. The state of the executor determines the behavior of the
+   * {@link MonitorRunnable} when it runs.
+   */
+  private enum ExecutorState {
+    /**
+     * Output has been produced since the last time the monitor ran. Work exists that has
not yet
+     * been evaluated, and all pending, including potentially blocked work, should be evaluated.
+     *
+     * <p>The executor becomes active whenever a timer fires, a {@link PCollectionView}
is updated,
+     * or output is produced by the evaluation of a {@link TransformExecutor}.
+     */
+    ACTIVE,
+    /**
+     * The Executor does not have any unevaluated work available to it, but work is in progress.
+     * Work should not be added until the Executor becomes active or no work is outstanding.
+     *
+     * <p>If all outstanding work completes without the executor becoming {@code ACTIVE},
the
+     * Executor enters state {@code QUIESCING}. Previously evaluated work must be reevaluated,
in
+     * case a side input has made progress.
+     */
+    PROCESSING,
+    /**
+     * All outstanding work is work that may be blocked on a side input. When there is no
+     * outstanding work, the executor becomes {@code QUIESCENT}.
+     */
+    QUIESCING,
+    /**
+     * All elements are either buffered in state or are blocked on a side input. There are
no
+     * timers that are permitted to fire but have not. There is no outstanding work.
+     *
+     * <p>The pipeline will not make progress without the progression of watermarks,
the progression
+     * of processing time, or the addition of elements.
+     */
+    QUIESCENT
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 3d6841d..12b18cb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -17,21 +17,22 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 
 import org.joda.time.Instant;
 
 import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -64,17 +65,7 @@ public abstract class StepTransformResult implements TransformResult {
   public abstract TimerUpdate getTimerUpdate();
 
   @Override
-  public boolean producedOutput() {
-    return !Iterables.isEmpty(getOutputBundles()) || producedAdditionalOutput();
-  }
-
-  /**
-   * Returns {@code true} if the step produced output that is not reflected in the Output
Bundles.
-   *
-   * <p>If a step modifies the contents of a {@link PCollectionView}, this should return
{@code
-   * true}.
-   */
-  abstract boolean producedAdditionalOutput();
+  public abstract Set<OutputType> getOutputTypes();
 
   public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold)
{
     return new Builder(transform, watermarkHold);
@@ -94,13 +85,14 @@ public abstract class StepTransformResult implements TransformResult {
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
-    private boolean producedAdditionalOutput;
+    private final Set<OutputType> producedOutputs;
     private final Instant watermarkHold;
 
     private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
       this.transform = transform;
       this.watermarkHold = watermarkHold;
       this.bundlesBuilder = ImmutableList.builder();
+      this.producedOutputs = EnumSet.noneOf(OutputType.class);
       this.unprocessedElementsBuilder = ImmutableList.builder();
       this.timerUpdate = TimerUpdate.builder(null).build();
     }
@@ -114,7 +106,7 @@ public abstract class StepTransformResult implements TransformResult {
           watermarkHold,
           state,
           timerUpdate,
-          producedAdditionalOutput);
+          producedOutputs);
     }
 
     public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
@@ -149,8 +141,8 @@ public abstract class StepTransformResult implements TransformResult {
       return this;
     }
 
-    public Builder withAdditionalOutput(boolean producedAdditionalOutput) {
-      this.producedAdditionalOutput = producedAdditionalOutput;
+    public Builder withAdditionalOutput(OutputType producedAdditionalOutput) {
+      producedOutputs.add(producedAdditionalOutput);
       return this;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 793b508..3db941d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -111,7 +111,7 @@ class TransformExecutor<T> implements Runnable {
       TransformEvaluator<T> evaluator =
           evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
       if (evaluator == null) {
-        onComplete.handleEmpty(inputBundle);
+        onComplete.handleEmpty(transform);
         // Nothing to do
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index f678928..c01fa56 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -24,9 +25,11 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
 
 import org.joda.time.Instant;
+
+import java.util.Set;
+
 import javax.annotation.Nullable;
 
 /**
@@ -82,9 +85,8 @@ public interface TransformResult {
   TimerUpdate getTimerUpdate();
 
   /**
-   * Returns whether output was produced by the evaluation of this transform. True if
-   * {@link #getOutputBundles()} is nonempty, or if pipeline-visible state has changed (for
example,
-   * the contents of a {@link PCollectionView} were updated).
+   * Returns the types of output produced by this {@link PTransform}. This may not include
+   * {@link OutputType#BUNDLE}, as empty bundles may be dropped when the transform is committed.
    */
-  boolean producedOutput();
+  Set<OutputType> getOutputTypes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 0a687ba..7a0b0f7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -77,8 +79,11 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       @Override
       public TransformResult finishBundle() {
         writer.add(elements);
-        return StepTransformResult.withoutHold(application)
-            .withAdditionalOutput(!elements.isEmpty())
+        Builder resultBuilder = StepTransformResult.withoutHold(application);
+        if (!elements.isEmpty()) {
+          resultBuilder = resultBuilder.withAdditionalOutput(OutputType.PCOLLECTION_VIEW);
+        }
+        return resultBuilder
             .build();
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 2ebd804..a8c647e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.direct;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -41,6 +42,7 @@ import org.junit.runners.JUnit4;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -66,7 +68,7 @@ public class CommittedResultTest implements Serializable {
             StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createRootBundle(created).commit(Instant.now()),
             Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
-            false);
+            EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
   }
@@ -82,7 +84,7 @@ public class CommittedResultTest implements Serializable {
             StepTransformResult.withoutHold(transform).build(),
             bundle,
             Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
-            false);
+            EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getUnprocessedInputs(),
         Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
@@ -95,7 +97,7 @@ public class CommittedResultTest implements Serializable {
             StepTransformResult.withoutHold(transform).build(),
             null,
             Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
-            false);
+            EnumSet.noneOf(OutputType.class));
 
     assertThat(result.getUnprocessedInputs(), nullValue());
   }
@@ -114,7 +116,7 @@ public class CommittedResultTest implements Serializable {
             StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createRootBundle(created).commit(Instant.now()),
             outputs,
-            true);
+            EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));
 
     assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index cae7ffd..d4b5773 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -61,14 +61,17 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index 59e1c71..cfc69bc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -18,14 +18,19 @@
 
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,35 +56,36 @@ public class StepTransformResultTest {
 
   @Test
   public void producedBundlesProducedOutputs() {
-    TransformResult result = StepTransformResult.withoutHold(transform)
-        .addOutput(bundleFactory.createRootBundle(pc))
+    UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pc);
+    TransformResult result = StepTransformResult.withoutHold(transform).addOutput(bundle)
         .build();
 
-    assertThat(result.producedOutput(), is(true));
+    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle>containsInAnyOrder(bundle));
   }
 
   @Test
   public void withAdditionalOutputProducedOutputs() {
-    TransformResult result =
-        StepTransformResult.withoutHold(transform).withAdditionalOutput(true).build();
+    TransformResult result = StepTransformResult.withoutHold(transform)
+        .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
+        .build();
 
-    assertThat(result.producedOutput(), is(true));
+    assertThat(result.getOutputTypes(), containsInAnyOrder(OutputType.PCOLLECTION_VIEW));
   }
 
   @Test
   public void producedBundlesAndAdditionalOutputProducedOutputs() {
     TransformResult result = StepTransformResult.withoutHold(transform)
         .addOutput(bundleFactory.createRootBundle(pc))
-        .withAdditionalOutput(true)
+        .withAdditionalOutput(OutputType.PCOLLECTION_VIEW)
         .build();
 
-    assertThat(result.producedOutput(), is(true));
+    assertThat(result.getOutputTypes(), hasItem(OutputType.PCOLLECTION_VIEW));
   }
 
   @Test
   public void noBundlesNoAdditionalOutputProducedOutputsFalse() {
     TransformResult result = StepTransformResult.withoutHold(transform).build();
 
-    assertThat(result.producedOutput(), is(false));
+    assertThat(result.getOutputTypes(), emptyIterable());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 7bd7ff2..f6cb8d1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -55,12 +56,14 @@ import org.mockito.MockitoAnnotations;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 /**
  * Tests for {@link TransformExecutor}.
  */
@@ -511,12 +514,14 @@ public class TransformExecutorTest {
 
       CommittedBundle<?> unprocessedBundle =
           inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
-      return CommittedResult.create(
-          result, unprocessedBundle, Collections.<CommittedBundle<?>>emptyList(),
false);
+      return CommittedResult.create(result,
+          unprocessedBundle,
+          Collections.<CommittedBundle<?>>emptyList(),
+          EnumSet.noneOf(OutputType.class));
     }
 
     @Override
-    public void handleEmpty(CommittedBundle<?> inputBundle) {
+    public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
       handledEmpty = true;
       onMethod.countDown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e7963f4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 7edfc5d..7c7005c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
@@ -72,6 +73,7 @@ import org.junit.runners.JUnit4;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -1426,6 +1428,8 @@ public class WatermarkManagerTest implements Serializable {
         StepTransformResult.withoutHold(transform).build(),
         unprocessedBundle,
         bundles,
-        !Iterables.isEmpty(bundles));
+        Iterables.isEmpty(bundles)
+            ? EnumSet.noneOf(OutputType.class)
+            : EnumSet.of(OutputType.BUNDLE));
   }
 }


Mime
View raw message