beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2795) FlinkRunner: translate using SDK-agnostic means
Date Sat, 13 Jan 2018 01:32:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324873#comment-16324873
] 

ASF GitHub Bot commented on BEAM-2795:
--------------------------------------

kennknowles closed pull request #4343: [BEAM-2795] Use portable constructs in Flink batch
translator
URL: https://github.com/apache/beam/pull/4343
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index ff431fca434..562eced90a1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_TRANSFORM_URN;
 
 import com.google.auto.service.AutoService;
@@ -26,8 +27,10 @@
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nonnull;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -49,6 +52,8 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Methods for translating between {@link Combine.PerKey} {@link PTransform PTransforms}
and {@link
@@ -91,8 +96,7 @@ public FunctionSpec translate(
           "%s received transform with null spec",
           getClass().getSimpleName());
       checkArgument(protoTransform.getSpec().getUrn().equals(COMBINE_TRANSFORM_URN));
-      return new RawCombine<>(
-          CombinePayload.parseFrom(protoTransform.getSpec().getPayload()), rehydratedComponents);
+      return new RawCombine<>(protoTransform, rehydratedComponents);
     }
 
     /** Registers {@link CombinePayloadTranslator}. */
@@ -181,24 +185,52 @@ public SdkFunctionSpec getCombineFn() {
         components);
   }
 
+  public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?,
?, ?> application)
+      throws IOException {
+    PTransform<?, ?> transform = application.getTransform();
+    if (transform instanceof Combine.PerKey) {
+      return ((Combine.PerKey<?, ?, ?>) transform).getSideInputs();
+    }
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform combineProto = PTransformTranslation.toProto(application, sdkComponents);
+    CombinePayload payload = CombinePayload.parseFrom(combineProto.getSpec().getPayload());
+
+    List<PCollectionView<?>> views = new ArrayList<>();
+    RehydratedComponents components =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
+    for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet())
{
+      String sideInputTag = sideInputEntry.getKey();
+      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+      PCollection<?> originalPCollection =
+          checkNotNull(
+              (PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)),
+              "no input with tag %s",
+              sideInputTag);
+      views.add(
+          PCollectionViewTranslation.viewFromProto(sideInput, sideInputTag, originalPCollection,
+              combineProto, components));
+    }
+    return views;
+  }
+
   private static class RawCombine<K, InputT, AccumT, OutputT>
       extends PTransformTranslation.RawPTransform<
           PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
       implements CombineLike {
 
+    private final RunnerApi.PTransform protoTransform;
     private final transient RehydratedComponents rehydratedComponents;
     private final FunctionSpec spec;
     private final CombinePayload payload;
     private final Coder<AccumT> accumulatorCoder;
 
-    private RawCombine(CombinePayload payload, RehydratedComponents rehydratedComponents)
{
+    private RawCombine(RunnerApi.PTransform protoTransform,
+        RehydratedComponents rehydratedComponents) throws IOException {
+      this.protoTransform = protoTransform;
       this.rehydratedComponents = rehydratedComponents;
-      this.payload = payload;
-      this.spec =
-          FunctionSpec.newBuilder()
-              .setUrn(COMBINE_TRANSFORM_URN)
-              .setPayload(payload.toByteString())
-              .build();
+      this.spec = protoTransform.getSpec();
+      this.payload = CombinePayload.parseFrom(spec.getPayload());
 
       // Eagerly extract the coder to throw a good exception here
       try {
@@ -242,6 +274,25 @@ public SdkFunctionSpec getCombineFn() {
       return accumulatorCoder;
     }
 
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+      for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet())
{
+        try {
+          additionalInputs.put(
+              new TupleTag<>(sideInputEntry.getKey()),
+              rehydratedComponents.getPCollection(
+                  protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
+        } catch (IOException exc) {
+          throw new IllegalStateException(
+              String.format(
+                  "Could not find input with name %s for %s transform",
+                  sideInputEntry.getKey(), Combine.class.getSimpleName()));
+        }
+      }
+      return additionalInputs;
+    }
+
     @Override
     public Map<String, SideInput> getSideInputs() {
       return payload.getSideInputsMap();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionViewTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionViewTranslation.java
new file mode 100644
index 00000000000..25361ed6753
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionViewTranslation.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+
+/** Utilities for interacting with PCollection view protos. */
+public class PCollectionViewTranslation {
+
+  /**
+   * Create a {@link PCollectionView} from a side input spec and an already-deserialized
{@link
+   * PCollection} that should be wired up.
+   */
+  public static PCollectionView<?> viewFromProto(
+      RunnerApi.SideInput sideInput,
+      String localName,
+      PCollection<?> pCollection,
+      RunnerApi.PTransform parDoTransform,
+      RehydratedComponents components)
+      throws IOException {
+    checkArgument(
+        localName != null,
+        "%s.viewFromProto: localName must not be null",
+        ParDoTranslation.class.getSimpleName());
+    TupleTag<?> tag = new TupleTag<>(localName);
+    WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
+    ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
+
+    WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
+    checkArgument(
+        sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
+        "Unknown View Materialization URN %s",
+        sideInput.getAccessPattern().getUrn());
+
+    PCollectionView<?> view =
+        new RunnerPCollectionView<>(
+            pCollection,
+            (TupleTag) tag,
+            (ViewFn) viewFn,
+            windowMappingFn,
+            windowingStrategy,
+            (Coder) pCollection.getCoder());
+    return view;
+  }
+
+  private static ViewFn<?, ?> viewFnFromProto(RunnerApi.SdkFunctionSpec viewFn)
+      throws InvalidProtocolBufferException {
+    RunnerApi.FunctionSpec spec = viewFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(ParDoTranslation.CUSTOM_JAVA_VIEW_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        ViewFn.class.getSimpleName(),
+        spec.getUrn());
+    return (ViewFn<?, ?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getPayload().toByteArray(), "Custom ViewFn");
+  }
+
+  private static WindowMappingFn<?> windowMappingFnFromProto(
+      RunnerApi.SdkFunctionSpec windowMappingFn)
+      throws InvalidProtocolBufferException {
+    RunnerApi.FunctionSpec spec = windowMappingFn.getSpec();
+    checkArgument(
+        spec.getUrn().equals(ParDoTranslation.CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
+        "Can't deserialize unknown %s type %s",
+        WindowMappingFn.class.getSimpleName(),
+        spec.getUrn());
+    return (WindowMappingFn<?>)
+        SerializableUtils.deserializeFromByteArray(
+            spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
+  }
+}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index e00b912d1f6..ede5cfdc817 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -57,7 +57,6 @@
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
@@ -77,7 +76,6 @@
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
 
 /** Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos.
*/
 public class ParDoTranslation {
@@ -323,7 +321,8 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?,
?, ?> ap
               "no input with tag %s",
               sideInputTag);
       views.add(
-          viewFromProto(sideInput, sideInputTag, originalPCollection, parDoProto, components));
+          PCollectionViewTranslation.viewFromProto(sideInput, sideInputTag, originalPCollection,
+              parDoProto, components));
     }
     return views;
   }
@@ -538,42 +537,6 @@ public static SideInput toProto(PCollectionView<?> view) {
     return builder.build();
   }
 
-  /**
-   * Create a {@link PCollectionView} from a side input spec and an already-deserialized
{@link
-   * PCollection} that should be wired up.
-   */
-  public static PCollectionView<?> viewFromProto(
-      SideInput sideInput,
-      String localName,
-      PCollection<?> pCollection,
-      RunnerApi.PTransform parDoTransform,
-      RehydratedComponents components)
-      throws IOException {
-    checkArgument(
-        localName != null,
-        "%s.viewFromProto: localName must not be null",
-        ParDoTranslation.class.getSimpleName());
-    TupleTag<?> tag = new TupleTag<>(localName);
-    WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
-    ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
-
-    WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
-    checkArgument(
-        sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
-        "Unknown View Materialization URN %s",
-        sideInput.getAccessPattern().getUrn());
-
-    PCollectionView<?> view =
-        new RunnerPCollectionView<>(
-            pCollection,
-            (TupleTag) tag,
-            (ViewFn) viewFn,
-            windowMappingFn,
-            windowingStrategy,
-            (Coder) pCollection.getCoder());
-    return view;
-  }
-
   private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
@@ -602,19 +565,6 @@ public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform)
throws
     return payload.getSplittable();
   }
 
-  private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn)
-      throws InvalidProtocolBufferException {
-    FunctionSpec spec = viewFn.getSpec();
-    checkArgument(
-        spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN),
-        "Can't deserialize unknown %s type %s",
-        ViewFn.class.getSimpleName(),
-        spec.getUrn());
-    return (ViewFn<?, ?>)
-        SerializableUtils.deserializeFromByteArray(
-            spec.getPayload().toByteArray(), "Custom ViewFn");
-  }
-
   private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
@@ -626,19 +576,6 @@ private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn)
{
         .build();
   }
 
-  private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn)
-      throws InvalidProtocolBufferException {
-    FunctionSpec spec = windowMappingFn.getSpec();
-    checkArgument(
-        spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN),
-        "Can't deserialize unknown %s type %s",
-        WindowMappingFn.class.getSimpleName(),
-        spec.getUrn());
-    return (WindowMappingFn<?>)
-        SerializableUtils.deserializeFromByteArray(
-            spec.getPayload().toByteArray(), "Custom WinodwMappingFn");
-  }
-
   static class RawParDo<InputT, OutputT>
       extends PTransformTranslation.RawPTransform<PCollection<InputT>, PCollection<OutputT>>
       implements ParDoLike {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index c8d38eb7ba8..d10d41c6c1a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -177,7 +177,7 @@ private static void addRehydratedTransform(
       PCollection<?> pCollection =
           (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
       views.add(
-          ParDoTranslation.viewFromProto(
+          PCollectionViewTranslation.viewFromProto(
               sideInput, localName, pCollection, transformProto, rehydratedComponents));
     }
     return PCollectionViews.toAdditionalInputs(views);
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 90f645373a1..e26974fa5b1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -155,7 +155,7 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable)
{
               "no input with tag %s",
               entry.getKey());
       views.add(
-          ParDoTranslation.viewFromProto(
+          PCollectionViewTranslation.viewFromProto(
               entry.getValue(),
               entry.getKey(),
               originalPCollection,
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 83594f1b9a2..ffdef3d2008 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -161,7 +161,7 @@ public void toAndFromTransformProto() throws Exception {
       for (PCollectionView<?> view : parDo.getSideInputs()) {
         SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
         PCollectionView<?> restoredView =
-            ParDoTranslation.viewFromProto(
+            PCollectionViewTranslation.viewFromProto(
                 sideInput,
                 view.getTagInternal().getId(),
                 view.getPCollection(),
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index a2a2e75f8fa..e0b906fd38d 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -250,6 +250,17 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-model-pipeline</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
index d22a5da252f..ba1275157f1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -94,8 +95,8 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     BatchTransformTranslator<?> translator =
         FlinkBatchTransformTranslators.getTranslator(transform);
     if (translator == null) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform
+      String transformUrn = PTransformTranslation.urnForTransform(transform);
+      throw new UnsupportedOperationException("The transform " + transformUrn
           + " is currently not supported.");
     }
     applyBatchTransform(transform, node, translator);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index a4265f59bf7..8731794db6e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -22,6 +22,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,7 +30,11 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.CombineTranslation;
+import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -47,27 +52,26 @@
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -129,12 +133,23 @@
   }
 
   private static class ReadSourceTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>>
{
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+        PTransform<PBegin, PCollection<T>>> {
 
     @Override
-    public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext
context) {
+    public void translateNode(PTransform<PBegin, PCollection<T>> transform,
+        FlinkBatchTranslationContext context) {
+      @SuppressWarnings("unchecked")
+      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>
application =
+          (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+              context.getCurrentTransform();
+      BoundedSource<T> source;
+      try {
+        source = ReadTranslation.boundedSourceFromTransform(application);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       String name = transform.getName();
-      BoundedSource<T> source = transform.getSource();
       PCollection<T> output = context.getOutput(transform);
 
       TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
@@ -151,10 +166,12 @@ public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContex
   }
 
   private static class WindowAssignTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>>
{
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+        PTransform<PCollection<T>, PCollection<T>>> {
 
     @Override
-    public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext
context) {
+    public void translateNode(PTransform<PCollection<T>, PCollection<T>>
transform,
+        FlinkBatchTranslationContext context) {
       PValue input = context.getInput(transform);
 
       TypeInformation<WindowedValue<T>> resultTypeInfo =
@@ -182,11 +199,12 @@ public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationConte
   }
 
   private static class GroupByKeyTranslatorBatch<K, InputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K,
InputT>> {
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>>
{
 
     @Override
     public void translateNode(
-        GroupByKey<K, InputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>
transform,
         FlinkBatchTranslationContext context) {
 
       // for now, this is copied from the Combine.PerKey translater. Once we have the new
runner API
@@ -332,18 +350,23 @@ public void translateNode(
 
   private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Combine.PerKey<K, InputT, OutputT>> {
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>
{
 
     @Override
     @SuppressWarnings("unchecked")
     public void translateNode(
-        Combine.PerKey<K, InputT, OutputT> transform,
+        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
         FlinkBatchTranslationContext context) {
       DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
-          (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) transform.getFn();
+      CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
+      try {
+            combineFn = (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>) CombineTranslation
+                .getCombineFn(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
 
       KvCoder<K, InputT> inputCoder =
           (KvCoder<K, InputT>) context.getInput(transform).getCoder();
@@ -373,7 +396,13 @@ public void translateNode(
       // construct a map from side input to WindowingStrategy so that
       // the DoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies
= new HashMap<>();
-      for (PCollectionView<?> sideInput: transform.getSideInputs()) {
+      List<PCollectionView<?>> sideInputs;
+      try {
+        sideInputs = CombineTranslation.getSideInputs(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      for (PCollectionView<?> sideInput: sideInputs) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
       }
 
@@ -406,7 +435,7 @@ public void translateNode(
                 partialReduceFunction,
                 "GroupCombine: " + transform.getName());
 
-        transformSideInputs(transform.getSideInputs(), groupCombine, context);
+        transformSideInputs(sideInputs, groupCombine, context);
 
         TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
             context.getTypeInfo(context.getOutput(transform));
@@ -420,7 +449,7 @@ public void translateNode(
             new GroupReduceOperator<>(
                 intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
 
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+        transformSideInputs(sideInputs, outputDataSet, context);
 
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
@@ -445,7 +474,7 @@ public void translateNode(
             new GroupReduceOperator<>(
                 grouping, reduceTypeInfo, reduceFunction, transform.getName());
 
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+        transformSideInputs(sideInputs, outputDataSet, context);
 
         context.setOutputDataSet(context.getOutput(transform), outputDataSet);
       }
@@ -466,23 +495,34 @@ private static void rejectSplittable(DoFn<?, ?> doFn) {
 
   private static class ParDoTranslatorBatch<InputT, OutputT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-      ParDo.MultiOutput<InputT, OutputT>> {
+      PTransform<PCollection<InputT>, PCollectionTuple>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public void translateNode(
-        ParDo.MultiOutput<InputT, OutputT> transform,
+        PTransform<PCollection<InputT>, PCollectionTuple> transform,
         FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getFn();
+      DoFn<InputT, OutputT> doFn;
+      try {
+        doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       rejectSplittable(doFn);
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
       Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
 
+      TupleTag<?> mainOutputTag;
+      try {
+        mainOutputTag = ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
       // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
-      outputMap.put(transform.getMainOutputTag(), 0);
+      outputMap.put(mainOutputTag, 0);
       int count = 1;
       for (TupleTag<?> tag : outputs.keySet()) {
         if (!outputMap.containsKey(tag)) {
@@ -490,12 +530,19 @@ public void translateNode(
         }
       }
 
+      // Union coder elements must match the order of the output tags.
+      Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
+      for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
+        indexMap.put(entry.getValue(), entry.getKey());
+      }
+
       // assume that the windowing strategy is the same for all outputs
       WindowingStrategy<?, ?> windowingStrategy = null;
 
       // collect all output Coders and create a UnionCoder for our tagged outputs
       List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PValue taggedValue : outputs.values()) {
+      for (TupleTag<?> tag : indexMap.values()) {
+        PValue taggedValue = outputs.get(tag);
         checkState(
             taggedValue instanceof PCollection,
             "Within ParDo, got a non-PCollection output %s of type %s",
@@ -518,7 +565,12 @@ public void translateNode(
                   unionCoder,
                   windowingStrategy.getWindowFn().windowCoder()));
 
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+      List<PCollectionView<?>> sideInputs;
+      try {
+        sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
 
       // construct a map from side input to WindowingStrategy so that
       // the DoFn runner can map main-input windows to side input windows
@@ -528,9 +580,13 @@ public void translateNode(
       }
 
       SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>,
?> outputDataSet;
-      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-      if (signature.stateDeclarations().size() > 0
-          || signature.timerDeclarations().size() > 0) {
+      boolean usesStateOrTimers;
+      try {
+        usesStateOrTimers = ParDoTranslation.usesStateOrTimers(context.getCurrentTransform());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      if (usesStateOrTimers) {
 
         // Based on the fact that the signature is stateful, DoFnSignatures ensures
         // that it is also keyed
@@ -540,7 +596,7 @@ public void translateNode(
         FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
             (DoFn) doFn, context.getCurrentTransform().getFullName(),
             windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
-            outputMap, transform.getMainOutputTag()
+            outputMap, (TupleTag<OutputT>) mainOutputTag
         );
 
         Grouping<WindowedValue<InputT>> grouping =
@@ -558,7 +614,7 @@ public void translateNode(
                 sideInputStrategies,
                 context.getPipelineOptions(),
                 outputMap,
-                transform.getMainOutputTag());
+                mainOutputTag);
 
         outputDataSet = new MapPartitionOperator<>(
             inputDataSet, typeInformation,
@@ -601,12 +657,12 @@ public void translateNode(
 
   private static class FlattenPCollectionTranslatorBatch<T>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-      Flatten.PCollections<T>> {
+      PTransform<PCollectionList<T>, PCollection<T>>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public void translateNode(
-        Flatten.PCollections<T> transform,
+        PTransform<PCollectionList<T>, PCollection<T>> transform,
         FlinkBatchTranslationContext context) {
 
       Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
@@ -661,17 +717,30 @@ public boolean filter(WindowedValue<T> tWindowedValue) throws
Exception {
 
   private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          View.CreatePCollectionView<ElemT, ViewT>> {
+          PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
 
     @Override
     public void translateNode(
-        View.CreatePCollectionView<ElemT, ViewT> transform,
+        PTransform<PCollection<ElemT>, PCollection<ElemT>> transform,
         FlinkBatchTranslationContext context) {
       DataSet<WindowedValue<ElemT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 
-      PCollectionView<ViewT> input = transform.getView();
-
+      @SuppressWarnings("unchecked")
+      AppliedPTransform<
+          PCollection<ElemT>,
+          PCollection<ElemT>,
+          PTransform<PCollection<ElemT>, PCollection<ElemT>>> application
=
+          (AppliedPTransform<
+              PCollection<ElemT>,
+              PCollection<ElemT>,
+              PTransform<PCollection<ElemT>, PCollection<ElemT>>>) context.getCurrentTransform();
+      PCollectionView<ViewT> input;
+      try {
+        input = CreatePCollectionViewTranslation.getView(application);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
       context.setSideInputDataSet(input, inputDataSet);
     }
   }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d2a2016c98a..f7b83f44dfd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.List;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.CollectionEnvironment;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> FlinkRunner: translate using SDK-agnostic means
> -----------------------------------------------
>
>                 Key: BEAM-2795
>                 URL: https://issues.apache.org/jira/browse/BEAM-2795
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Kenneth Knowles
>              Labels: portability
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Mime
View raw message