beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From goe...@apache.org
Subject [beam] branch master updated: [BEAM-6865] share hasUnboundedPCollections method
Date Wed, 03 Apr 2019 20:39:13 GMT
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ae3a6  [BEAM-6865] share hasUnboundedPCollections method
     new 99c1cb3  Merge pull request #8212 from ibzib/check-unbounded
76ae3a6 is described below

commit 76ae3a6746e21b7ffdaf5b3cbfe99f3c6107eb72
Author: Kyle Weaver <kcweaver@google.com>
AuthorDate: Wed Apr 3 10:23:00 2019 -0700

    [BEAM-6865] share hasUnboundedPCollections method
---
 .../org/apache/beam/runners/flink/FlinkPipelineRunner.java  | 13 +------------
 .../fnexecution/translation/PipelineTranslatorUtils.java    | 13 +++++++++++++
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 7bd2709..f3dc60f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,9 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
 
-import java.util.Collection;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -89,14 +88,4 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
 
     return FlinkRunner.createPipelineResult(result, pipelineOptions);
   }
-
-  /** Indicates whether the given pipeline has any unbounded PCollections. */
-  private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
-    checkNotNull(pipeline);
-    Collection<RunnerApi.PCollection> pCollecctions =
-        pipeline.getComponents().getPcollectionsMap().values();
-    // Assume that all PCollections are consumed at some point in the pipeline.
-    return pCollecctions.stream()
-        .anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED);
-  }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index c39b8b7..a0ea37e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -17,8 +17,12 @@
  */
 package org.apache.beam.runners.fnexecution.translation;
 
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
@@ -76,4 +80,13 @@ public final class PipelineTranslatorUtils {
           e);
     }
   }
+
+  /** Indicates whether the given pipeline has any unbounded PCollections. */
+  public static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
+    checkNotNull(pipeline);
+    Collection<PCollection> pCollecctions = pipeline.getComponents().getPcollectionsMap().values();
+    // Assume that all PCollections are consumed at some point in the pipeline.
+    return pCollecctions.stream()
+        .anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED);
+  }
 }


Mime
View raw message