beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: Correct Apex translation of Single-input Flatten
Date Tue, 28 Feb 2017 17:20:37 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2e072a032 -> 8998cb90d


Correct Apex translation of Single-input Flatten


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed602aae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed602aae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed602aae

Branch: refs/heads/master
Commit: ed602aae0010bbe1ee09c15e11ace3d11139d0dc
Parents: 2e072a0
Author: Thomas Weise <thw@apache.org>
Authored: Mon Feb 27 22:32:28 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Feb 28 09:19:54 2017 -0800

----------------------------------------------------------------------
 .../FlattenPCollectionTranslator.java           |  9 ++++++---
 .../apex/translation/TranslationContext.java    | 14 ++++++++++++++
 .../FlattenPCollectionTranslatorTest.java       | 20 ++++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 928f135..2e31dfc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -43,9 +43,9 @@ class FlattenPCollectionTranslator<T> implements
 
   @Override
   public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext
context) {
-    List<TaggedPValue> inputs = context.getInputs();
+    List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());
 
-    if (inputs.isEmpty()) {
+    if (inputCollections.isEmpty()) {
       // create a dummy source that never emits anything
       @SuppressWarnings("unchecked")
       UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
@@ -53,10 +53,13 @@ class FlattenPCollectionTranslator<T> implements
       ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
           unboundedSource, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
+    } else if (inputCollections.size() == 1) {
+      context.addAlias(context.getOutput(), inputCollections.get(0));
     } else {
+      @SuppressWarnings("unchecked")
       PCollection<T> output = (PCollection<T>) context.getOutput();
       Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
-      flattenCollections(extractPCollections(inputs), unionTags, output, context);
+      flattenCollections(inputCollections, unionTags, output, context);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index acd8ab1..fc49fc7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -57,6 +57,7 @@ class TranslationContext {
   private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>>
streams = new HashMap<>();
   private final Map<String, Operator> operators = new HashMap<>();
   private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
+  private Map<PInput, PInput> aliasCollections = new HashMap<>();
 
   public void addView(PCollectionView<?> view) {
     this.viewInputs.put(view, this.getInput());
@@ -145,11 +146,24 @@ class TranslationContext {
   }
 
   public void addStream(PInput input, InputPort inputPort) {
+    while (aliasCollections.containsKey(input)) {
+      input = aliasCollections.get(input);
+    }
     Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
     checkArgument(stream != null, "no upstream operator defined for %s", input);
     stream.getRight().add(inputPort);
   }
 
+  /**
+   * Set the given output as alias for another input,
+   * i.e. there won't be a stream representation in the target DAG.
+   * @param alias
+   * @param source
+   */
+  public void addAlias(PValue alias, PInput source) {
+    aliasCollections.put(alias, source);
+  }
+
   public void populateDAG(DAG dag) {
     for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
       dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());

http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
index 7e678e8..b2e29b6 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.apex.translation;
 
+import com.datatorrent.api.DAG;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
@@ -25,6 +26,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
@@ -93,4 +97,20 @@ public class FlattenPCollectionTranslatorTest {
     }
   }
 
+  @Test
+  public void testFlattenSingleCollection() {
+    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setRunner(ApexRunner.class);
+    ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
+    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    DAG dag = launcher.getDAG();
+
+    Pipeline p = Pipeline.create(options);
+    PCollection<String> single = p.apply(Create.of(Collections.singletonList("1")));
+    PCollectionList.of(single).apply(Flatten.<String>pCollections())
+      .apply(ParDo.of(new EmbeddedCollector()));
+    translator.translate(p, dag);
+    Assert.assertNotNull(dag.getOperatorMeta("ParDo(EmbeddedCollector)"));
+  }
+
 }


Mime
View raw message