beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/6] beam git commit: Move Flink Runner classes to base package and make package private
Date Wed, 22 Feb 2017 17:29:20 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
new file mode 100644
index 0000000..eaab3d1
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -0,0 +1,1043 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.FlinkCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains all the mappings between Beam and Flink
+ * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator}
+ * traverses the Beam job and comes here to translate the encountered Beam transformations
+ * into Flink one, based on the mapping available in this class.
+ */
+class FlinkStreamingTransformTranslators {
+
+  // --------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // --------------------------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private static final Map<
+      Class<? extends PTransform>,
+      FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+
+  // here you can find all the available translators.
+  static {
+    TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
+    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+    TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
+
+    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
+        new CreateViewStreamingTranslator());
+
+    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+  }
+
+  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
+      PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Transformation Implementations
+  // --------------------------------------------------------------------------------------------
+
+  private static class TextIOWriteBoundStreamingTranslator
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
+
+    private static final Logger LOG =
+        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+
+    @Override
+    public void translateNode(
+        TextIO.Write.Bound transform,
+        FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
+
+      String filenamePrefix = transform.getFilenamePrefix();
+      String filenameSuffix = transform.getFilenameSuffix();
+      boolean needsValidation = transform.needsValidation();
+      int numShards = transform.getNumShards();
+      String shardNameTemplate = transform.getShardNameTemplate();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn(
+          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+          needsValidation);
+      LOG.warn(
+          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+          filenameSuffix);
+      LOG.warn(
+          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+          shardNameTemplate);
+
+      DataStream<String> dataSink = inputDataStream
+          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
+            @Override
+            public void flatMap(
+                WindowedValue<String> value,
+                Collector<String> out)
+                throws Exception {
+              out.collect(value.getValue());
+            }
+          });
+      DataStreamSink<String> output =
+          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+
+      if (numShards > 0) {
+        output.setParallelism(numShards);
+      }
+    }
+  }
+
+  private static class WriteSinkStreamingTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+
+    @Override
+    public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+      String name = transform.getName();
+      PValue input = context.getInput(transform);
+
+      Sink<T> sink = transform.getSink();
+      if (!(sink instanceof UnboundedFlinkSink)) {
+        throw new UnsupportedOperationException(
+            "At the time, only unbounded Flink sinks are supported.");
+      }
+
+      DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
+
+      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
+        @Override
+        public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception {
+          out.collect(value.getValue());
+        }
+      }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
+    }
+  }
+
+  private static class UnboundedReadSourceTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+
+    @Override
+    public void translateNode(
+        Read.Unbounded<T> transform,
+        FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataStream<WindowedValue<T>> source;
+      if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
+        @SuppressWarnings("unchecked")
+        UnboundedFlinkSource<T> flinkSourceFunction =
+            (UnboundedFlinkSource<T>) transform.getSource();
+
+        final AssignerWithPeriodicWatermarks<T> flinkAssigner =
+            flinkSourceFunction.getFlinkTimestampAssigner();
+
+        DataStream<T> flinkSource = context.getExecutionEnvironment()
+            .addSource(flinkSourceFunction.getFlinkSource());
+
+        flinkSourceFunction.setCoder(
+            new FlinkCoder<T>(flinkSource.getType(),
+              context.getExecutionEnvironment().getConfig()));
+
+        source = flinkSource
+            .assignTimestampsAndWatermarks(flinkAssigner)
+            .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
+              @Override
+              public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
+                collector.collect(
+                    WindowedValue.of(
+                        s,
+                        new Instant(flinkAssigner.extractTimestamp(s, -1)),
+                        GlobalWindow.INSTANCE,
+                        PaneInfo.NO_FIRING));
+              }}).returns(outputTypeInfo);
+      } else {
+        try {
+          UnboundedSourceWrapper<T, ?> sourceWrapper =
+              new UnboundedSourceWrapper<>(
+                  context.getPipelineOptions(),
+                  transform.getSource(),
+                  context.getExecutionEnvironment().getParallelism());
+          source = context
+              .getExecutionEnvironment()
+              .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+        } catch (Exception e) {
+          throw new RuntimeException(
+              "Error while translating UnboundedSource: " + transform.getSource(), e);
+        }
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  private static class BoundedReadSourceTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(
+        Read.Bounded<T> transform,
+        FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+
+      DataStream<WindowedValue<T>> source;
+      try {
+        BoundedSourceWrapper<T> sourceWrapper =
+            new BoundedSourceWrapper<>(
+                context.getPipelineOptions(),
+                transform.getSource(),
+                context.getExecutionEnvironment().getParallelism());
+        source = context
+            .getExecutionEnvironment()
+            .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error while translating BoundedSource: " + transform.getSource(), e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  private static void rejectSplittable(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+    if (signature.processElement().isSplittable()) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s does not currently support splittable DoFn: %s",
+              FlinkRunner.class.getSimpleName(), doFn));
+    }
+  }
+
+  private static class ParDoBoundStreamingTranslator<InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        ParDo.Bound<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(
+        ParDo.Bound<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DoFn<InputT, OutputT> doFn = transform.getFn();
+      rejectSplittable(doFn);
+
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getOutput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<OutputT>> typeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      @SuppressWarnings("unchecked")
+      PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection);
+
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+      Coder keyCoder = null;
+      boolean stateful = false;
+      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+        // Based on the fact that the signature is stateful, DoFnSignatures ensures
+        // that it is also keyed
+        keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+        inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
+        stateful = true;
+      }
+
+      if (sideInputs.isEmpty()) {
+        DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputCoder,
+                new TupleTag<OutputT>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                keyCoder);
+
+        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream
+            .transform(transform.getName(), typeInfo, doFnOperator);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputCoder,
+                new TupleTag<OutputT>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
+                windowingStrategy,
+                transformedSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                keyCoder);
+
+        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream;
+        if (stateful) {
+          // we have to manually contruct the two-input transform because we're not
+          // allowed to have only one input keyed, normally.
+          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+          TwoInputTransformation<
+              WindowedValue<KV<?, InputT>>,
+              RawUnionValue,
+              WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation<>(
+              keyedStream.getTransformation(),
+              transformedSideInputs.f1.broadcast().getTransformation(),
+              transform.getName(),
+              (TwoInputStreamOperator) doFnOperator,
+              typeInfo,
+              keyedStream.getParallelism());
+
+          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
+
+          outDataStream = new SingleOutputStreamOperator(
+                  keyedStream.getExecutionEnvironment(),
+                  rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+        } else {
+          outDataStream = inputDataStream
+              .connect(transformedSideInputs.f1.broadcast())
+              .transform(transform.getName(), typeInfo, doFnOperator);
+        }
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      }
+    }
+  }
+
+  /**
+   * Wraps each element in a {@link RawUnionValue} with the given tag id.
+   */
+  private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+    private final int intTag;
+
+    public ToRawUnion(int intTag) {
+      this.intTag = intTag;
+    }
+
+    @Override
+    public RawUnionValue map(T o) throws Exception {
+      return new RawUnionValue(intTag, o);
+    }
+  }
+
+  private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
+        transformSideInputs(
+          Collection<PCollectionView<?>> sideInputs,
+          FlinkStreamingTranslationContext context) {
+
+    // collect all side inputs
+    Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+    Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+    int count = 0;
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      intToViewMapping.put(count, sideInput);
+      tagToIntMapping.put(tag, count);
+      count++;
+      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+    }
+
+
+    List<Coder<?>> inputCoders = new ArrayList<>();
+    for (PCollectionView<?> sideInput: sideInputs) {
+      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+      TypeInformation<Object> tpe = sideInputStream.getType();
+      if (!(tpe instanceof CoderTypeInformation)) {
+        throw new IllegalStateException(
+            "Input Stream TypeInformation is no CoderTypeInformation.");
+      }
+
+      Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
+      inputCoders.add(coder);
+    }
+
+    UnionCoder unionCoder = UnionCoder.of(inputCoders);
+
+    CoderTypeInformation<RawUnionValue> unionTypeInformation =
+        new CoderTypeInformation<>(unionCoder);
+
+    // transform each side input to RawUnionValue and union them
+    DataStream<RawUnionValue> sideInputUnion = null;
+
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      final int intTag = tagToIntMapping.get(tag);
+      DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+      DataStream<RawUnionValue> unionValueStream =
+          sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
+
+      if (sideInputUnion == null) {
+        sideInputUnion = unionValueStream;
+      } else {
+        sideInputUnion = sideInputUnion.union(unionValueStream);
+      }
+    }
+
+    if (sideInputUnion == null) {
+      throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
+    }
+
+    return new Tuple2<>(intToViewMapping, sideInputUnion);
+  }
+
+
+  private static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      ParDo.BoundMulti<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(
+        ParDo.BoundMulti<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DoFn<InputT, OutputT> doFn = transform.getFn();
+      rejectSplittable(doFn);
+
+      // we assume that the transformation does not change the windowing strategy.
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      List<TaggedPValue> outputs = context.getOutputs(transform);
+
+      Map<TupleTag<?>, Integer> tagsToLabels =
+          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs);
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+
+      @SuppressWarnings("unchecked")
+      PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+
+      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection);
+
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+      Coder keyCoder = null;
+      boolean stateful = false;
+      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+        // Based on the fact that the signature is stateful, DoFnSignatures ensures
+        // that it is also keyed
+        keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+        inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
+        stateful = true;
+      }
+
+      if (sideInputs.isEmpty()) {
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputCoder,
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll(),
+                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                keyCoder);
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        unionOutputStream = inputDataStream
+            .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            new DoFnOperator<>(
+                transform.getFn(),
+                inputCoder,
+                transform.getMainOutputTag(),
+                transform.getSideOutputTags().getAll(),
+                new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                windowingStrategy,
+                transformedSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                keyCoder);
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        if (stateful) {
+          // we have to manually contruct the two-input transform because we're not
+          // allowed to have only one input keyed, normally.
+          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+          TwoInputTransformation<
+              WindowedValue<KV<?, InputT>>,
+              RawUnionValue,
+              WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
+              keyedStream.getTransformation(),
+              transformedSideInputs.f1.broadcast().getTransformation(),
+              transform.getName(),
+              (TwoInputStreamOperator) doFnOperator,
+              outputUnionTypeInformation,
+              keyedStream.getParallelism());
+
+          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
+
+          unionOutputStream = new SingleOutputStreamOperator(
+                  keyedStream.getExecutionEnvironment(),
+                  rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        } else {
+          unionOutputStream = inputDataStream
+              .connect(transformedSideInputs.f1.broadcast())
+              .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+        }
+      }
+
+      SplitStream<RawUnionValue> splitStream = unionOutputStream
+              .split(new OutputSelector<RawUnionValue>() {
+                @Override
+                public Iterable<String> select(RawUnionValue value) {
+                  return Collections.singletonList(Integer.toString(value.getUnionTag()));
+                }
+              });
+
+      for (TaggedPValue output : outputs) {
+        final int outputTag = tagsToLabels.get(output.getTag());
+
+        TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
+
+        @SuppressWarnings("unchecked")
+        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+            @Override
+            public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+              out.collect(value.getValue());
+            }
+          }).returns(outputTypeInfo);
+
+        context.setOutputDataStream(output.getValue(), unwrapped);
+      }
+    }
+
+    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+        TupleTag<?> mainTag,
+        List<TaggedPValue> allTaggedValues) {
+
+      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+      int count = 0;
+      tagToLabelMap.put(mainTag, count++);
+      for (TaggedPValue taggedPValue : allTaggedValues) {
+        if (!tagToLabelMap.containsKey(taggedPValue.getTag())) {
+          tagToLabelMap.put(taggedPValue.getTag(), count++);
+        }
+      }
+      return tagToLabelMap;
+    }
+
+    private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) {
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (TaggedPValue taggedColl : taggedCollections) {
+        checkArgument(
+            taggedColl.getValue() instanceof PCollection,
+            "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
+            PCollection.class.getSimpleName(),
+            taggedColl.getValue().getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedColl.getValue();
+        WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
+            WindowedValue.getFullCoder(
+                coll.getCoder(),
+                coll.getWindowingStrategy().getWindowFn().windowCoder());
+        outputCoders.add(windowedValueCoder);
+      }
+      return UnionCoder.of(outputCoders);
+    }
+  }
+
+  private static class CreateViewStreamingTranslator<ElemT, ViewT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
+
+    @Override
+    public void translateNode(
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+        FlinkStreamingTranslationContext context) {
+      // just forward
+      DataStream<WindowedValue<List<ElemT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      PCollectionView<ViewT> view = context.getOutput(transform);
+
+      context.setOutputDataStream(view, inputDataSet);
+    }
+  }
+
+  private static class WindowBoundTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+
+    @Override
+    public void translateNode(
+        Window.Bound<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<T, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<T>> typeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataStream<WindowedValue<T>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+
+      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+          new FlinkAssignWindows<>(windowFn);
+
+      SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
+          .flatMap(assignWindowsFunction)
+          .name(context.getOutput(transform).getName())
+          .returns(typeInfo);
+
+      context.setOutputDataStream(context.getOutput(transform), outputDataStream);
+    }
+  }
+
+  private static class ReshuffleTranslatorStreaming<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Reshuffle<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
+
+    }
+  }
+
+
+  private static class GroupByKeyTranslator<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        GroupByKey<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+          WindowedValue.getFullCoder(
+              workItemCoder,
+              input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+          inputDataStream
+              .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+          WindowedValue<
+              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
+          SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+
+      TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DoFnOperator.DefaultOutputManagerFactory<
+            WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+          new DoFnOperator.DefaultOutputManagerFactory<>();
+
+      WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
+          new WindowDoFnOperator<>(
+              reduceFn,
+              (Coder) windowedWorkItemCoder,
+              new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+              Collections.<TupleTag<?>>emptyList(),
+              outputManagerFactory,
+              windowingStrategy,
+              new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+              Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+              context.getPipelineOptions(),
+              inputKvCoder.getKeyCoder());
+
+      // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+      @SuppressWarnings("unchecked")
+      SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
+          keyedWorkItemStream
+              .transform(
+                  transform.getName(),
+                  outputTypeInfo,
+                  (OneInputStreamOperator) doFnOperator);
+
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+
+    }
+  }
+
+  private static class CombinePerKeyTranslator<K, InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      Combine.PerKey<K, InputT, OutputT>> {
+
+    @Override
+    boolean canTranslate(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      // if we have a merging window strategy and side inputs we cannot
+      // translate as a proper combine. We have to group and then run the combine
+      // over the final grouped values.
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
+    }
+
+    @Override
+    public void translateNode(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+            WindowedValue.getFullCoder(
+                workItemCoder,
+                input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+            WindowedValue<
+                SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining(
+          inputKvCoder.getKeyCoder(),
+          AppliedCombineFn.withInputCoder(
+              transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
+
+      TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      if (sideInputs.isEmpty()) {
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (Coder) windowedWorkItemCoder,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+        @SuppressWarnings("unchecked")
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+            keyedWorkItemStream.transform(
+                transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (Coder) windowedWorkItemCoder,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                transformSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // we have to manually contruct the two-input transform because we're not
+        // allowed to have only one input keyed, normally.
+
+        TwoInputTransformation<
+            WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+            RawUnionValue,
+            WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
+            keyedWorkItemStream.getTransformation(),
+            transformSideInputs.f1.broadcast().getTransformation(),
+            transform.getName(),
+            (TwoInputStreamOperator) doFnOperator,
+            outputTypeInfo,
+            keyedWorkItemStream.getParallelism());
+
+        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+        rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+            new SingleOutputStreamOperator(
+                keyedWorkItemStream.getExecutionEnvironment(),
+                rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+        keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        context.setOutputDataStream(context.getOutput(transform), outDataStream);
+      }
+    }
+
+    private static class ToKeyedWorkItem<K, InputT>
+        extends RichFlatMapFunction<
+          WindowedValue<KV<K, InputT>>,
+          WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+      @Override
+      public void flatMap(
+          WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+          Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
+
+        // we need to wrap each one work item per window for now
+        // since otherwise the PushbackSideInputRunner will not correctly
+        // determine whether side inputs are ready
+        for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
+          SingletonKeyedWorkItem<K, InputT> workItem =
+              new SingletonKeyedWorkItem<>(
+                  in.getValue().getKey(),
+                  in.withValue(in.getValue().getValue()));
+
+          in.withValue(workItem);
+          out.collect(in.withValue(workItem));
+        }
+      }
+    }
+  }
+
+  private static class FlattenPCollectionTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        Flatten.FlattenPCollectionList<T>> {
+
+    @Override
+    public void translateNode(
+        Flatten.FlattenPCollectionList<T> transform,
+        FlinkStreamingTranslationContext context) {
+      List<TaggedPValue> allInputs = context.getInputs(transform);
+
+      if (allInputs.isEmpty()) {
+
+        // create an empty dummy source to satisfy downstream operations
+        // we cannot create an empty source in Flink, therefore we have to
+        // add the flatMap that simply never forwards the single element
+        DataStreamSource<String> dummySource =
+            context.getExecutionEnvironment().fromElements("dummy");
+
+        DataStream<WindowedValue<T>> result = dummySource.flatMap(
+            new FlatMapFunction<String, WindowedValue<T>>() {
+              @Override
+              public void flatMap(
+                  String s,
+                  Collector<WindowedValue<T>> collector) throws Exception {
+                // never return anything
+              }
+            }).returns(
+            new CoderTypeInformation<>(
+                WindowedValue.getFullCoder(
+                    (Coder<T>) VoidCoder.of(),
+                    GlobalWindow.Coder.INSTANCE)));
+        context.setOutputDataStream(context.getOutput(transform), result);
+
+      } else {
+        DataStream<T> result = null;
+        for (TaggedPValue input : allInputs) {
+          DataStream<T> current = context.getInputDataStream(input.getValue());
+          result = (result == null) ? current : result.union(current);
+        }
+        context.setOutputDataStream(context.getOutput(transform), result);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
new file mode 100644
index 0000000..3d5b83f
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Helper for keeping track of which {@link DataStream DataStreams} map
+ * to which {@link PTransform PTransforms}.
+ */
+class FlinkStreamingTranslationContext {
+
+  private final StreamExecutionEnvironment env;
+  private final PipelineOptions options;
+
+  /**
+   * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
+   * Flink Operator that produced it, after the translation of the correspondinf PTransform
+   * to its Flink equivalent.
+   * */
+  private final Map<PValue, DataStream<?>> dataStreams;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
+    this.env = checkNotNull(env);
+    this.options = checkNotNull(options);
+    this.dataStreams = new HashMap<>();
+  }
+
+  public StreamExecutionEnvironment getExecutionEnvironment() {
+    return env;
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> DataStream<T> getInputDataStream(PValue value) {
+    return (DataStream<T>) dataStreams.get(value);
+  }
+
+  public void setOutputDataStream(PValue value, DataStream<?> set) {
+    if (!dataStreams.containsKey(value)) {
+      dataStreams.put(value, set);
+    }
+  }
+
+  /**
+   * Sets the AppliedPTransform which carries input/output.
+   * @param currentTransform
+   */
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+    this.currentTransform = currentTransform;
+  }
+
+  public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+
+    return WindowedValue.getFullCoder(
+        valueCoder,
+        collection.getWindowingStrategy().getWindowFn().windowCoder());
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            valueCoder,
+            collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+    return new CoderTypeInformation<>(windowedValueCoder);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
+  }
+
+  public <T extends PInput> List<TaggedPValue> getInputs(PTransform<T, ?> transform) {
+    return currentTransform.getInputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
+  }
+
+  public <OutputT extends POutput> List<TaggedPValue> getOutputs(PTransform<?, OutputT> transform) {
+    return currentTransform.getOutputs();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
index 0a9df4e..0ff6367 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 /**
  * Flink streaming overrides for various view (side input) transforms.
  */
-public class FlinkStreamingViewOverrides {
+class FlinkStreamingViewOverrides {
 
   /**
    * Specialized implementation for

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
new file mode 100644
index 0000000..3acc3ea
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
+ */
+class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
+
+  private TranslationMode translationMode;
+
+  private final FlinkPipelineOptions options;
+
+  public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) {
+    this.translationMode = defaultMode;
+    this.options = options;
+  }
+
+  public TranslationMode getTranslationMode() {
+
+    // override user-specified translation mode
+    if (options.isStreaming()) {
+      return TranslationMode.STREAMING;
+    }
+
+    return translationMode;
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    Class<? extends PTransform> transformClass = node.getTransform().getClass();
+    if (transformClass == Read.Unbounded.class) {
+      LOG.info("Found {}. Switching to streaming execution.", transformClass);
+      translationMode = TranslationMode.STREAMING;
+    }
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
new file mode 100644
index 0000000..ad54750
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+/**
+ * The translation mode of the Beam Pipeline.
+ */
+enum TranslationMode {
+
+  /** Uses the batch mode of Flink. */
+  BATCH,
+
+  /** Uses the streaming mode of Flink. */
+  STREAMING
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 7fb17c8..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
- * Flink batch job.
- */
-public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
-
-  /**
-   * The necessary context in the case of a batch job.
-   */
-  private final FlinkBatchTranslationContext batchContext;
-
-  private int depth = 0;
-
-  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
-    this.batchContext = new FlinkBatchTranslationContext(env, options);
-  }
-
-  @Override
-  @SuppressWarnings("rawtypes, unchecked")
-  public void translate(Pipeline pipeline) {
-    super.translate(pipeline);
-
-    // terminate dangling DataSets
-    for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
-      dataSet.output(new DiscardingOutputFormat());
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // --------------------------------------------------------------------------------------------
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-    this.depth++;
-
-    BatchTransformTranslator<?> translator = getTranslator(node);
-
-    if (translator != null) {
-      applyBatchTransform(node.getTransform(), node, translator);
-      LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
-      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-    } else {
-      return CompositeBehavior.ENTER_TRANSFORM;
-    }
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    this.depth--;
-    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
-
-    // get the transformation corresponding to the node we are
-    // currently visiting and translate it into its Flink alternative.
-    PTransform<?, ?> transform = node.getTransform();
-    BatchTransformTranslator<?> translator =
-        FlinkBatchTransformTranslators.getTranslator(transform);
-    if (translator == null) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform
-          + " is currently not supported.");
-    }
-    applyBatchTransform(transform, node, translator);
-  }
-
-  private <T extends PTransform<?, ?>> void applyBatchTransform(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      BatchTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the batchContext
-    batchContext.setCurrentTransform(node.toAppliedPTransform());
-    typedTranslator.translateNode(typedTransform, batchContext);
-  }
-
-  /**
-   * A translator of a {@link PTransform}.
-   */
-  public interface BatchTransformTranslator<TransformT extends PTransform> {
-    void translateNode(TransformT transform, FlinkBatchTranslationContext context);
-  }
-
-  /**
-   * Returns a translator for the given node, if it is possible, otherwise null.
-   */
-  private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
-    PTransform<?, ?> transform = node.getTransform();
-
-    // Root of the graph is null
-    if (transform == null) {
-      return null;
-    }
-
-    return FlinkBatchTransformTranslators.getTranslator(transform);
-  }
-}


Mime
View raw message