beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/6] beam git commit: Move Flink Runner classes to base package and make package private
Date Wed, 22 Feb 2017 17:29:19 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
deleted file mode 100644
index 29ba9a6..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ /dev/null
@@ -1,775 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
-import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.join.UnionCoder;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.GroupCombineOperator;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.util.Collector;
-
-/**
- * Translators for transforming {@link PTransform PTransforms} to
- * Flink {@link DataSet DataSets}.
- */
-class FlinkBatchTransformTranslators {
-
-  // --------------------------------------------------------------------------------------------
-  //  Transform Translator Registry
-  // --------------------------------------------------------------------------------------------
-
-  @SuppressWarnings("rawtypes")
-  private static final Map<
-      Class<? extends PTransform>,
-      FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
-
-  static {
-    TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
-
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
-
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
-
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
-
-    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
-    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
-
-    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
-  }
-
-
-  static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
-      PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
-  }
-
-  private static class ReadSourceTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
-      String name = transform.getName();
-      BoundedSource<T> source = transform.getSource();
-      PCollection<T> output = context.getOutput(transform);
-
-      TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
-
-      DataSource<WindowedValue<T>> dataSource = new DataSource<>(
-          context.getExecutionEnvironment(),
-          new SourceInputFormat<>(source, context.getPipelineOptions()),
-          typeInformation,
-          name);
-
-      context.setOutputDataSet(output, dataSource);
-    }
-  }
-
-  private static class WindowBoundTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {
-
-    @Override
-    public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext context) {
-      PValue input = context.getInput(transform);
-
-      TypeInformation<WindowedValue<T>> resultTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
-      @SuppressWarnings("unchecked")
-      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
-      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
-          new FlinkAssignWindows<>(windowFn);
-
-      DataSet<WindowedValue<T>> resultDataSet = inputDataSet
-          .flatMap(assignWindowsFunction)
-          .name(context.getOutput(transform).getName())
-          .returns(resultTypeInfo);
-
-      context.setOutputDataSet(context.getOutput(transform), resultDataSet);
-    }
-  }
-
-  private static class GroupByKeyTranslatorBatch<K, InputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        GroupByKey<K, InputT> transform,
-        FlinkBatchTranslationContext context) {
-
-      // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
-      // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
-
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
-          new Concatenate<InputT>().asKeyedFn();
-
-      KvCoder<K, InputT> inputCoder =
-          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
-      Coder<List<InputT>> accumulatorCoder;
-
-      try {
-        accumulatorCoder =
-            combineFn.getAccumulatorCoder(
-                context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder());
-      } catch (CannotProvideCoderException e) {
-        throw new RuntimeException(e);
-      }
-
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
-          new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-                  windowingStrategy.getWindowFn().windowCoder()));
-
-
-      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
-      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
-
-      if (windowingStrategy.getWindowFn().isNonMerging()) {
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkPartialReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-      } else {
-        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not supported.");
-        }
-
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkMergingReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-      }
-
-      // Partially GroupReduce the values into the intermediate format AccumT (combine)
-      GroupCombineOperator<
-          WindowedValue<KV<K, InputT>>,
-          WindowedValue<KV<K, List<InputT>>>> groupCombine =
-          new GroupCombineOperator<>(
-              inputGrouping,
-              partialReduceTypeInfo,
-              partialReduceFunction,
-              "GroupCombine: " + transform.getName());
-
-      Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
-          groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
-
-      // Fully reduce the values and create output format VO
-      GroupReduceOperator<
-          WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
-          new GroupReduceOperator<>(
-              intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
-    }
-
-  }
-
-  private static class ReshuffleTranslatorBatch<K, InputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        Reshuffle<K, InputT> transform,
-        FlinkBatchTranslationContext context) {
-
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
-
-    }
-
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
-   * is expected to crash!
-   *
-   * <p>This is copied from the dataflow runner code.
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-
-  private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Combine.PerKey<K, InputT, OutputT>> {
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void translateNode(
-        Combine.PerKey<K, InputT, OutputT> transform,
-        FlinkBatchTranslationContext context) {
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
-          (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
-
-      KvCoder<K, InputT> inputCoder =
-          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
-      Coder<AccumT> accumulatorCoder;
-
-      try {
-        accumulatorCoder =
-            combineFn.getAccumulatorCoder(
-                context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder());
-      } catch (CannotProvideCoderException e) {
-        throw new RuntimeException(e);
-      }
-
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
-          context.getTypeInfo(
-              KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-              windowingStrategy);
-
-      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-      // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
-      for (PCollectionView<?> sideInput: transform.getSideInputs()) {
-        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
-      }
-
-      if (windowingStrategy.getWindowFn().isNonMerging()) {
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
-        FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
-            new FlinkPartialReduceFunction<>(
-                combineFn,
-                boundedStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
-            new FlinkReduceFunction<>(
-                combineFn,
-                boundedStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        // Partially GroupReduce the values into the intermediate format AccumT (combine)
-        GroupCombineOperator<
-            WindowedValue<KV<K, InputT>>,
-            WindowedValue<KV<K, AccumT>>> groupCombine =
-            new GroupCombineOperator<>(
-                inputGrouping,
-                partialReduceTypeInfo,
-                partialReduceFunction,
-                "GroupCombine: " + transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), groupCombine, context);
-
-        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
-            context.getTypeInfo(context.getOutput(transform));
-
-        Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
-            groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
-
-        // Fully reduce the values and create output format OutputT
-        GroupReduceOperator<
-            WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
-            new GroupReduceOperator<>(
-                intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
-      } else {
-        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not supported.");
-        }
-
-        // for merging windows we can't to a pre-shuffle combine step since
-        // elements would not be in their correct windows for side-input access
-
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
-            new FlinkMergingNonShuffleReduceFunction<>(
-                combineFn,
-                intervalStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
-            context.getTypeInfo(context.getOutput(transform));
-
-        Grouping<WindowedValue<KV<K, InputT>>> grouping =
-            inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-        // Fully reduce the values and create output format OutputT
-        GroupReduceOperator<
-            WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
-            new GroupReduceOperator<>(
-                grouping, reduceTypeInfo, reduceFunction, transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-      }
-
-
-    }
-  }
-
-  private static void rejectSplittable(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-    if (signature.processElement().isSplittable()) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s does not currently support splittable DoFn: %s",
-              FlinkRunner.class.getSimpleName(), doFn));
-    }
-  }
-
-  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-
-    if (signature.stateDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-              DoFn.StateId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              FlinkRunner.class.getSimpleName()));
-    }
-
-    if (signature.timerDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
-              DoFn.TimerId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              FlinkRunner.class.getSimpleName()));
-    }
-  }
-
-  private static class ParDoBoundTranslatorBatch<InputT, OutputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          ParDo.Bound<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(
-        ParDo.Bound<InputT, OutputT> transform,
-
-        FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectSplittable(doFn);
-      rejectStateAndTimers(doFn);
-
-      DataSet<WindowedValue<InputT>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      TypeInformation<WindowedValue<OutputT>> typeInformation =
-          context.getTypeInfo(context.getOutput(transform));
-
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
-      // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
-      for (PCollectionView<?> sideInput: sideInputs) {
-        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
-      }
-
-      FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
-          new FlinkDoFnFunction<>(
-              doFn,
-              context.getOutput(transform).getWindowingStrategy(),
-              sideInputStrategies,
-              context.getPipelineOptions());
-
-      MapPartitionOperator<WindowedValue<InputT>, WindowedValue<OutputT>> outputDataSet =
-          new MapPartitionOperator<>(
-              inputDataSet,
-              typeInformation,
-              doFnWrapper,
-              transform.getName());
-
-      transformSideInputs(sideInputs, outputDataSet, context);
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
-  private static class ParDoBoundMultiTranslatorBatch<InputT, OutputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          ParDo.BoundMulti<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(
-        ParDo.BoundMulti<InputT, OutputT> transform,
-        FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectSplittable(doFn);
-      rejectStateAndTimers(doFn);
-      DataSet<WindowedValue<InputT>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      List<TaggedPValue> outputs = context.getOutputs(transform);
-
-      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
-      // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
-      outputMap.put(transform.getMainOutputTag(), 0);
-      int count = 1;
-      for (TaggedPValue taggedValue : outputs) {
-        if (!outputMap.containsKey(taggedValue.getTag())) {
-          outputMap.put(taggedValue.getTag(), count++);
-        }
-      }
-
-      // assume that the windowing strategy is the same for all outputs
-      WindowingStrategy<?, ?> windowingStrategy = null;
-
-      // collect all output Coders and create a UnionCoder for our tagged outputs
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (TaggedPValue taggedValue : outputs) {
-        checkState(
-            taggedValue.getValue() instanceof PCollection,
-            "Within ParDo, got a non-PCollection output %s of type %s",
-            taggedValue.getValue(),
-            taggedValue.getValue().getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedValue.getValue();
-        outputCoders.add(coll.getCoder());
-        windowingStrategy = coll.getWindowingStrategy();
-      }
-
-      if (windowingStrategy == null) {
-        throw new IllegalStateException("No outputs defined.");
-      }
-
-      UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
-      TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
-          new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  unionCoder,
-                  windowingStrategy.getWindowFn().windowCoder()));
-
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
-      // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
-      for (PCollectionView<?> sideInput: sideInputs) {
-        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
-      }
-
-      @SuppressWarnings("unchecked")
-      FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
-          new FlinkMultiOutputDoFnFunction(
-              doFn,
-              windowingStrategy,
-              sideInputStrategies,
-              context.getPipelineOptions(),
-              outputMap);
-
-      MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet =
-          new MapPartitionOperator<>(
-              inputDataSet,
-              typeInformation,
-              doFnWrapper,
-              transform.getName());
-
-      transformSideInputs(sideInputs, taggedDataSet, context);
-
-      for (TaggedPValue output : outputs) {
-        pruneOutput(
-            taggedDataSet,
-            context,
-            outputMap.get(output.getTag()),
-            (PCollection) output.getValue());
-      }
-    }
-
-    private <T> void pruneOutput(
-        MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet,
-        FlinkBatchTranslationContext context,
-        int integerTag,
-        PCollection<T> collection) {
-      TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
-
-      FlinkMultiOutputPruningFunction<T> pruningFunction =
-          new FlinkMultiOutputPruningFunction<>(integerTag);
-
-      FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
-          new FlatMapOperator<>(
-              taggedDataSet,
-              outputType,
-              pruningFunction,
-              collection.getName());
-
-      context.setOutputDataSet(collection, pruningOperator);
-    }
-  }
-
-  private static class FlattenPCollectionTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Flatten.FlattenPCollectionList<T>> {
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void translateNode(
-        Flatten.FlattenPCollectionList<T> transform,
-        FlinkBatchTranslationContext context) {
-
-      List<TaggedPValue> allInputs = context.getInputs(transform);
-      DataSet<WindowedValue<T>> result = null;
-
-      if (allInputs.isEmpty()) {
-
-        // create an empty dummy source to satisfy downstream operations
-        // we cannot create an empty source in Flink, therefore we have to
-        // add the flatMap that simply never forwards the single element
-        DataSource<String> dummySource =
-            context.getExecutionEnvironment().fromElements("dummy");
-        result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
-          @Override
-          public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
-            // never return anything
-          }
-        }).returns(
-            new CoderTypeInformation<>(
-                WindowedValue.getFullCoder(
-                    (Coder<T>) VoidCoder.of(),
-                    GlobalWindow.Coder.INSTANCE)));
-      } else {
-        for (TaggedPValue taggedPc : allInputs) {
-          checkArgument(
-              taggedPc.getValue() instanceof PCollection,
-              "Got non-PCollection input to flatten: %s of type %s",
-              taggedPc.getValue(),
-              taggedPc.getValue().getClass().getSimpleName());
-          PCollection<T> collection = (PCollection<T>) taggedPc.getValue();
-          DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
-          if (result == null) {
-            result = current;
-          } else {
-            result = result.union(current);
-          }
-        }
-      }
-
-      // insert a dummy filter, there seems to be a bug in Flink
-      // that produces duplicate elements after the union in some cases
-      // if we don't
-      result = result.filter(new FilterFunction<WindowedValue<T>>() {
-        @Override
-        public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
-          return true;
-        }
-      }).name("UnionFixFilter");
-      context.setOutputDataSet(context.getOutput(transform), result);
-    }
-  }
-
-  private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          View.CreatePCollectionView<ElemT, ViewT>> {
-
-    @Override
-    public void translateNode(
-        View.CreatePCollectionView<ElemT, ViewT> transform,
-        FlinkBatchTranslationContext context) {
-      DataSet<WindowedValue<ElemT>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      PCollectionView<ViewT> input = transform.getView();
-
-      context.setSideInputDataSet(input, inputDataSet);
-    }
-  }
-
-  private static void transformSideInputs(
-      List<PCollectionView<?>> sideInputs,
-      SingleInputUdfOperator<?, ?, ?> outputDataSet,
-      FlinkBatchTranslationContext context) {
-    // get corresponding Flink broadcast DataSets
-    for (PCollectionView<?> input : sideInputs) {
-      DataSet<?> broadcastSet = context.getSideInputDataSet(input);
-      outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
-    }
-  }
-
-  private FlinkBatchTransformTranslators() {}
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
deleted file mode 100644
index 1f91e5e..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Helper for {@link FlinkBatchPipelineTranslator} and translators in
- * {@link FlinkBatchTransformTranslators}.
- */
-public class FlinkBatchTranslationContext {
-
-  private final Map<PValue, DataSet<?>> dataSets;
-  private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
-
-  /**
-   * For keeping track about which DataSets don't have a successor. We
-   * need to terminate these with a discarding sink because the Beam
-   * model allows dangling operations.
-   */
-  private final Map<PValue, DataSet<?>> danglingDataSets;
-
-  private final ExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  // ------------------------------------------------------------------------
-
-  public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
-    this.env = env;
-    this.options = options;
-    this.dataSets = new HashMap<>();
-    this.broadcastDataSets = new HashMap<>();
-
-    this.danglingDataSets = new HashMap<>();
-  }
-
-  // ------------------------------------------------------------------------
-
-  public Map<PValue, DataSet<?>> getDanglingDataSets() {
-    return danglingDataSets;
-  }
-
-  public ExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
-    // assume that the DataSet is used as an input if retrieved here
-    danglingDataSets.remove(value);
-    return (DataSet<WindowedValue<T>>) dataSets.get(value);
-  }
-
-  public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
-    if (!dataSets.containsKey(value)) {
-      dataSets.put(value, set);
-      danglingDataSets.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
-    return (DataSet<T>) broadcastDataSets.get(value);
-  }
-
-  public <ViewT, ElemT> void setSideInputDataSet(
-      PCollectionView<ViewT> value,
-      DataSet<WindowedValue<ElemT>> set) {
-    if (!broadcastDataSets.containsKey(value)) {
-      broadcastDataSets.put(value, set);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
-    return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
-      Coder<T> coder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-        WindowedValue.getFullCoder(
-            coder,
-            windowingStrategy.getWindowFn().windowCoder());
-
-    return new CoderTypeInformation<>(windowedValueCoder);
-  }
-
-  List<TaggedPValue> getInputs(PTransform<?, ?> transform) {
-    return currentTransform.getInputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs()).getValue();
-  }
-
-  List<TaggedPValue> getOutputs(PTransform<?, ?> transform) {
-    return currentTransform.getOutputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getOutputs()).getValue();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
deleted file mode 100644
index cba28e4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.sdk.Pipeline;
-
-/**
- * The role of this class is to translate the Beam operators to
- * their Flink counterparts. If we have a streaming job, this is instantiated as a
- * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
- * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
- * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into
- * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
- * {@link org.apache.flink.api.java.DataSet} (for batch) one.
- */
-public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-
-  /**
-   * Translates the pipeline by passing this class as a visitor.
-   * @param pipeline The pipeline to be translated
-   */
-  public void translate(Pipeline pipeline) {
-    pipeline.traverseTopologically(this);
-  }
-
-  /**
-   * Utility formatting method.
-   * @param n number of spaces to generate
-   * @return String with "|" followed by n spaces
-   */
-  protected static String genSpaces(int n) {
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      builder.append("|   ");
-    }
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/e0e7e52b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
deleted file mode 100644
index 93bf0cc..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate
- * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a
- * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- */
-public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
-
-  /** The necessary context in the case of a straming job. */
-  private final FlinkStreamingTranslationContext streamingContext;
-
-  private int depth = 0;
-
-  public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
-    this.streamingContext = new FlinkStreamingTranslationContext(env, options);
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // --------------------------------------------------------------------------------------------
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-    this.depth++;
-
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null) {
-      StreamTransformTranslator<?> translator =
-          FlinkStreamingTransformTranslators.getTranslator(transform);
-
-      if (translator != null && applyCanTranslate(transform, node, translator)) {
-        applyStreamingTransform(transform, node, translator);
-        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
-        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-      }
-    }
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    this.depth--;
-    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
-    // get the transformation corresponding to hte node we are
-    // currently visiting and translate it into its Flink alternative.
-
-    PTransform<?, ?> transform = node.getTransform();
-    StreamTransformTranslator<?> translator =
-        FlinkStreamingTransformTranslators.getTranslator(transform);
-
-    if (translator == null || !applyCanTranslate(transform, node, translator)) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException(
-          "The transform " + transform + " is currently not supported.");
-    }
-    applyStreamingTransform(transform, node, translator);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    // do nothing here
-  }
-
-  private <T extends PTransform<?, ?>> void applyStreamingTransform(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      StreamTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the streamingContext
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
-    typedTranslator.translateNode(typedTransform, streamingContext);
-  }
-
-  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      StreamTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
-    streamingContext.setCurrentTransform(node.toAppliedPTransform());
-
-    return typedTranslator.canTranslate(typedTransform, streamingContext);
-  }
-
-  /**
-   * The interface that every Flink translator of a Beam operator should implement.
-   * This interface is for <b>streaming</b> jobs. For examples of such translators see
-   * {@link FlinkStreamingTransformTranslators}.
-   */
-  abstract static class StreamTransformTranslator<T extends PTransform> {
-
-    /**
-     * Translate the given transform.
-     */
-    abstract void translateNode(T transform, FlinkStreamingTranslationContext context);
-
-    /**
-     * Returns true iff this translator can translate the given transform.
-     */
-    boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
-      return true;
-    }
-  }
-}


Mime
View raw message