beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Add option for toProto/fromProto translations in DirectRunner, disabled by default.
Date Thu, 07 Sep 2017 04:50:48 GMT
Repository: beam
Updated Branches:
  refs/heads/master 6dfd3a2b7 -> df1476d82


Add option for toProto/fromProto translations in DirectRunner, disabled by default.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/247b1334
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/247b1334
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/247b1334

Branch: refs/heads/master
Commit: 247b1334734bf70ebfb9354988fcb3840f41d55f
Parents: 6dfd3a2
Author: mingmxu <mingmxu@ebay.com>
Authored: Fri Sep 1 15:16:19 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Wed Sep 6 21:50:01 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/direct/DirectOptions.java |  8 ++++++++
 .../org/apache/beam/runners/direct/DirectRunner.java  | 14 +++++++++-----
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 574ab46..af67306 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -74,4 +76,10 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions
{
       return Math.max(Runtime.getRuntime().availableProcessors(), MIN_PARALLELISM);
     }
   }
+
+  @Experimental(Kind.CORE_RUNNERS_ONLY)
+  @Default.Boolean(false)
+  @Description("Control whether toProto/fromProto translations are applied to original Pipeline")
+  boolean isProtoTranslation();
+  void setProtoTranslation(boolean b);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/247b1334/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 642ce8f..35d55b1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -160,11 +160,15 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
   @Override
   public DirectPipelineResult run(Pipeline originalPipeline) {
     Pipeline pipeline;
-    try {
-      pipeline = PipelineTranslation.fromProto(
-          PipelineTranslation.toProto(originalPipeline));
-    } catch (IOException exception) {
-      throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+    if (getPipelineOptions().isProtoTranslation()) {
+      try {
+        pipeline = PipelineTranslation.fromProto(
+            PipelineTranslation.toProto(originalPipeline));
+      } catch (IOException exception) {
+        throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+      }
+    } else {
+      pipeline = originalPipeline;
     }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);


Mime
View raw message