beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Reduce Prevalence of PValue in the DirectRunner
Date Wed, 31 May 2017 17:38:58 GMT
Reduce Prevalence of PValue in the DirectRunner

Use PCollection or PCollectionView explicitly.

Retrieve views from the WriteView transform rather than visiting the
view as an output PValue.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a78bd3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a78bd3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a78bd3f

Branch: refs/heads/master
Commit: 6a78bd3f09c99f49c5f27d15b3791f200bf5d53d
Parents: dc70383
Author: Thomas Groh <tgroh@google.com>
Authored: Wed May 31 09:38:02 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed May 31 10:38:45 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectGraph.java | 26 +++++++++++---------
 .../beam/runners/direct/DirectGraphVisitor.java | 25 ++++++++++---------
 .../beam/runners/direct/DirectRunner.java       |  4 ++-
 .../beam/runners/direct/EvaluationContext.java  | 17 ++++++++++++-
 .../beam/runners/direct/WatermarkManager.java   | 19 +++++++++++---
 .../runners/direct/DirectGraphVisitorTest.java  |  3 +++
 .../beam/runners/direct/DirectGraphs.java       | 10 +++++++-
 .../runners/direct/EvaluationContextTest.java   |  6 ++++-
 8 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index 83b214a..c2c0afa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -24,9 +24,9 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 /**
@@ -34,39 +34,43 @@ import org.apache.beam.sdk.values.PValue;
  * executed with the {@link DirectRunner}.
  */
 class DirectGraph {
-  private final Map<POutput, AppliedPTransform<?, ?, ?>> producers;
+  private final Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers;
+  private final Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters;
   private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers;
-  private final Set<PCollectionView<?>> views;
 
   private final Set<AppliedPTransform<?, ?, ?>> rootTransforms;
   private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
 
   public static DirectGraph create(
-      Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+      Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
+      Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
       ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
-      Set<PCollectionView<?>> views,
       Set<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
-    return new DirectGraph(producers, primitiveConsumers, views, rootTransforms, stepNames);
+    return new DirectGraph(producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
   }
 
   private DirectGraph(
-      Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+      Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
+      Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
       ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
-      Set<PCollectionView<?>> views,
       Set<AppliedPTransform<?, ?, ?>> rootTransforms,
       Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
     this.producers = producers;
+    this.viewWriters = viewWriters;
     this.primitiveConsumers = primitiveConsumers;
-    this.views = views;
     this.rootTransforms = rootTransforms;
     this.stepNames = stepNames;
   }
 
-  AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
+  AppliedPTransform<?, ?, ?> getProducer(PCollection<?> produced) {
     return producers.get(produced);
   }
 
+  AppliedPTransform<?, ?, ?> getWriter(PCollectionView<?> view) {
+    return viewWriters.get(view);
+  }
+
   List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
     return primitiveConsumers.get(consumed);
   }
@@ -76,7 +80,7 @@ class DirectGraph {
   }
 
   Set<PCollectionView<?>> getViews() {
-    return views;
+    return viewWriters.keySet();
   }
 
   String getStepName(AppliedPTransform<?, ?, ?> step) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 01204e3..d54de5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 /**
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PValue;
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
-  private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+  private Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers = new
HashMap<>();
+  private Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters
= new HashMap<>();
 
   private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers
=
       ArrayListMultimap.create();
 
-  private Set<PCollectionView<?>> views = new HashSet<>();
   private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
   private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
   private int numTransforms = 0;
@@ -86,17 +86,19 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
       for (PValue value : node.getInputs().values()) {
         primitiveConsumers.put(value, appliedTransform);
       }
+      if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
+        viewWriters.put(
+            ((ViewOverrideFactory.WriteView<?, ?>) node.getTransform()).getView(),
+            node.toAppliedPTransform(getPipeline()));
+      }
     }
   }
 
  @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
-    if (value instanceof PCollectionView) {
-      views.add((PCollectionView<?>) value);
-    }
-    if (!producers.containsKey(value)) {
-      producers.put(value, appliedTransform);
+    if (value instanceof PCollection && !producers.containsKey(value)) {
+      producers.put((PCollection<?>) value, appliedTransform);
     }
   }
 
@@ -111,11 +113,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
   }
 
   /**
-   * Get the graph constructed by this {@link DirectGraphVisitor}, which provides
-   * lookups for producers and consumers of {@link PValue PValues}.
+   * Get the graph constructed by this {@link DirectGraphVisitor}, which provides lookups
for
+   * producers and consumers of {@link PValue PValues}.
    */
   public DirectGraph getGraph() {
     checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
-    return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
+    return DirectGraph.create(
+        producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 69dea8f..dbd1ec4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -220,7 +221,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
    * iteration order based on the order at which elements are added to it.
    */
   @SuppressWarnings("rawtypes")
-  private List<PTransformOverride> defaultTransformOverrides() {
+  @VisibleForTesting
+  List<PTransformOverride> defaultTransformOverrides() {
     return ImmutableList.<PTransformOverride>builder()
         .add(
             PTransformOverride.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 88ce85a..e215070 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -276,7 +276,7 @@ class EvaluationContext {
    * callback will be executed regardless of whether values have been produced.
    */
   public void scheduleAfterOutputWouldBeProduced(
-      PValue value,
+      PCollection<?> value,
       BoundedWindow window,
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
@@ -287,6 +287,21 @@ class EvaluationContext {
   }
 
   /**
+   * Schedule a callback to be executed after output would be produced for the given window
if there
+   * had been input.
+   */
+  public void scheduleAfterOutputWouldBeProduced(
+      PCollectionView<?> view,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    AppliedPTransform<?, ?, ?> producing = graph.getWriter(view);
+    callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+    fireAvailableCallbacks(producing);
+  }
+
+  /**
    * Schedule a callback to be executed after the given window is expired.
    *
    * <p>For example, upstream state associated with the window may be cleared.

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 4f1b831..40ce163 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -60,6 +60,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
@@ -790,6 +791,18 @@ class WatermarkManager {
     }
   }
 
+  private TransformWatermarks getValueWatermark(PValue pvalue) {
+    if (pvalue instanceof PCollection) {
+      return getTransformWatermark(graph.getProducer((PCollection<?>) pvalue));
+    } else if (pvalue instanceof PCollectionView<?>) {
+      return getTransformWatermark(graph.getWriter((PCollectionView<?>) pvalue));
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "Unknown type of %s %s", PValue.class.getSimpleName(), pvalue.getClass()));
+    }
+  }
+
   private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform)
{
     TransformWatermarks wms = transformToWatermarks.get(transform);
     if (wms == null) {
@@ -824,8 +837,7 @@ class WatermarkManager {
     }
     for (PValue pvalue : inputs.values()) {
       Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue))
-              .synchronizedProcessingOutputWatermark;
+          getValueWatermark(pvalue).synchronizedProcessingOutputWatermark;
       inputWmsBuilder.add(producerOutputWatermark);
     }
     return inputWmsBuilder.build();
@@ -838,8 +850,7 @@ class WatermarkManager {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
     for (PValue pvalue : inputs.values()) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
+      Watermark producerOutputWatermark = getValueWatermark(pvalue).outputWatermark;
       inputWatermarksBuilder.add(producerOutputWatermark);
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 7f46a0e..576edf3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -78,6 +78,9 @@ public class DirectGraphVisitorTest implements Serializable {
             .apply(View.<String>asList());
     PCollectionView<Object> singletonView =
         p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
+    p.replaceAll(
+        DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
+            .defaultTransformOverrides());
     p.traverseTopologically(visitor);
     assertThat(
         visitor.getGraph().getViews(),

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
index 2f048fa..43de091 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 
 /** Test utilities for the {@link DirectRunner}. */
@@ -30,6 +32,12 @@ final class DirectGraphs {
   }
 
   public static AppliedPTransform<?, ?, ?> getProducer(PValue value) {
-    return getGraph(value.getPipeline()).getProducer(value);
+    if (value instanceof PCollection) {
+      return getGraph(value.getPipeline()).getProducer((PCollection<?>) value);
+    } else if (value instanceof PCollectionView) {
+      return getGraph(value.getPipeline()).getWriter((PCollectionView<?>) value);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unexpected type of %s %s", PValue.class.getSimpleName(), value.getClass()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 80b04f8..c0e43d6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,6 +101,10 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(GenerateSequence.from(0));
 
+    p.replaceAll(
+        DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
+            .defaultTransformOverrides());
+
     KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
     p.traverseTopologically(keyedPValueTrackingVisitor);
 
@@ -116,7 +120,7 @@ public class EvaluationContextTest {
 
     createdProducer = graph.getProducer(created);
     downstreamProducer = graph.getProducer(downstream);
-    viewProducer = graph.getProducer(view);
+    viewProducer = graph.getWriter(view);
     unboundedProducer = graph.getProducer(unbounded);
   }
 


Mime
View raw message