beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: Remove Unneccessary DataflowRunner Overrides
Date Thu, 16 Feb 2017 19:45:28 GMT
Repository: beam
Updated Branches:
  refs/heads/master 55340e617 -> 817688aac


Remove Unneccessary DataflowRunner Overrides

The removed overrides are indistinguishable from the transform that they
are overriding.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d8865de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d8865de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d8865de

Branch: refs/heads/master
Commit: 4d8865def33355551d16096831de5cf58c28272d
Parents: 55340e6
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Feb 16 09:14:47 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Feb 16 11:45:17 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 36 +++-----------------
 1 file changed, 5 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4d8865de/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index db6a7d9..f1270db 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -336,7 +336,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
       builder.put(Read.Bounded.class, StreamingBoundedRead.class);
-      builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
       builder.put(PubsubIO.Read.PubsubBoundedReader.class, UnsupportedIO.class);
@@ -351,7 +350,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
       }
     } else {
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
-      builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
       // In batch mode must use the custom Pubsub bounded source/sink.
       builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
@@ -376,32 +374,18 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
 
-    if (Combine.GroupedValues.class.equals(transform.getClass())
-        || GroupByKey.class.equals(transform.getClass())) {
-
+    if (Combine.GroupedValues.class.equals(transform.getClass())) {
       // For both Dataflow runners (streaming and batch), GroupByKey and GroupedValues are
       // primitives. Returning a primitive output instead of the expanded definition
       // signals to the translator that translation is necessary.
       @SuppressWarnings("unchecked")
       PCollection<?> pc = (PCollection<?>) input;
       @SuppressWarnings("unchecked")
-      OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
-          pc.getPipeline(),
-          transform instanceof GroupByKey
-              ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy())
-              : pc.getWindowingStrategy(),
-          pc.isBounded());
+      OutputT outputT =
+          (OutputT)
+              PCollection.createPrimitiveOutputInternal(
+                  pc.getPipeline(), pc.getWindowingStrategy(), pc.isBounded());
       return outputT;
-    } else if (Window.Bound.class.equals(transform.getClass())) {
-      /*
-       * TODO: make this the generic way overrides are applied (using super.apply() rather
than
-       * Pipeline.applyTransform(); this allows the apply method to be replaced without inserting
-       * additional nodes into the graph.
-       */
-      // casting to wildcard
-      @SuppressWarnings("unchecked")
-      OutputT windowed = (OutputT) applyWindow((Window.Bound<?>) transform, (PCollection<?>)
input);
-      return windowed;
     } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
         && ((PCollectionList<?>) input).size() == 0) {
       // This can cause downstream coder inference to be screwy. Most of the time, that won't
be
@@ -432,16 +416,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     }
   }
 
-  private <T> PCollection<T> applyWindow(
-      Window.Bound<?> intitialTransform, PCollection<?> initialInput) {
-    // types are matched at compile time
-    @SuppressWarnings("unchecked")
-    Window.Bound<T> transform = (Window.Bound<T>) intitialTransform;
-    @SuppressWarnings("unchecked")
-    PCollection<T> input = (PCollection<T>) initialInput;
-    return super.apply(new AssignWindows<>(transform), input);
-  }
-
   private String debuggerMessage(String projectId, String uniquifier) {
     return String.format("To debug your job, visit Google Cloud Debugger at: "
         + "https://console.developers.google.com/debug?project=%s&dbgee=%s",


Mime
View raw message