beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhule...@apache.org
Subject [beam] branch master updated: Pass pipeline options from caller to expansion service (#11574)
Date Tue, 05 May 2020 20:40:09 GMT
This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e15717  Pass pipeline options from caller to expansion service (#11574)
5e15717 is described below

commit 5e1571760b61b8ce247d5375b71c8df4d69d6409
Author: Brian Hulette <bhulette@google.com>
AuthorDate: Tue May 5 13:39:52 2020 -0700

    Pass pipeline options from caller to expansion service (#11574)
---
 model/job-management/src/main/proto/beam_expansion_api.proto  |  4 ++++
 .../apache/beam/sdk/expansion/service/ExpansionService.java   |  6 +++++-
 sdks/python/apache_beam/runners/job/utils.py                  | 11 +++++++++++
 .../python/apache_beam/runners/portability/portable_runner.py |  9 +--------
 sdks/python/apache_beam/transforms/external.py                |  9 ++++++++-
 5 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto b/model/job-management/src/main/proto/beam_expansion_api.proto
index e358c56..c2c9a72 100644
--- a/model/job-management/src/main/proto/beam_expansion_api.proto
+++ b/model/job-management/src/main/proto/beam_expansion_api.proto
@@ -30,6 +30,7 @@ option java_package = "org.apache.beam.model.expansion.v1";
 option java_outer_classname = "ExpansionApi";
 
 import "beam_runner_api.proto";
+import "google/protobuf/struct.proto";
 
 message ExpansionRequest {
   // Set of components needed to interpret the transform, or which
@@ -46,6 +47,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // The pipeline options specified by the caller
+  google.protobuf.Struct pipeline_options = 4;
 }
 
 message ExpansionResponse {
diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index e19754d..0e88313 100644
--- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -38,6 +38,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.SdkComponents;
@@ -313,8 +314,11 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB
         request.getTransform().getUniqueName(),
         request.getTransform().getSpec().getUrn());
     LOG.debug("Full transform: {}", request.getTransform());
+
+    Pipeline pipeline =
+        Pipeline.create(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));
+
     Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
-    Pipeline pipeline = Pipeline.create();
     ExperimentalOptions.addExperiment(
         pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
     RehydratedComponents rehydratedComponents =
diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py
index 1a90adb..672ab4c 100644
--- a/sdks/python/apache_beam/runners/job/utils.py
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -28,6 +28,17 @@ from google.protobuf import json_format
 from google.protobuf import struct_pb2
 
 
+def pipeline_options_dict_to_struct(options):
+  # type: (dict) -> struct_pb2.Struct
+  # TODO: Define URNs for options.
+  # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+  return dict_to_struct({
+      'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
+      for k,
+      v in options.items() if v is not None
+  })
+
+
 def dict_to_struct(dict_obj):
   # type: (dict) -> struct_pb2.Struct
   return json_format.ParseDict(dict_obj, struct_pb2.Struct())
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 180a41e..b416352 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -164,14 +164,7 @@ class JobServiceHandle(object):
     all_options = self.options.get_all_options(
         add_extra_args_fn=add_runner_options,
         retain_unknown_options=self._retain_unknown_options)
-    # TODO: Define URNs for options.
-    # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
-    p_options = {
-        'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
-        for k,
-        v in all_options.items() if v is not None
-    }
-    return job_utils.dict_to_struct(p_options)
+    return job_utils.pipeline_options_dict_to_struct(all_options)
 
   def prepare(self, proto_pipeline):
     # type: (beam_runner_api_pb2.Pipeline) -> beam_job_api_pb2.PrepareJobResponse
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index 4f31cbb..cfb5e67 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -37,6 +37,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
 from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
 from apache_beam.runners import pipeline_context
+from apache_beam.runners.job import utils as job_utils
 from apache_beam.transforms import ptransform
 from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
 from apache_beam.typehints.trivial_inference import instance_to_type
@@ -309,10 +310,16 @@ class ExternalTransform(ptransform.PTransform):
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
     components = context.to_runner_api()
+
+    # Retain unknown options since they may only be relevant to the expanding
+    # SDK
+    options = pipeline._options.get_all_options(
+        drop_default=True, retain_unknown_options=True)
     request = beam_expansion_api_pb2.ExpansionRequest(
         components=components,
         namespace=self._namespace,  # type: ignore  # mypy thinks self._namespace is threading.local
-        transform=transform_proto)
+        transform=transform_proto,
+        pipeline_options=job_utils.pipeline_options_dict_to_struct(options))
 
     if isinstance(self._expansion_service, str):
       # Some environments may not support unsecure channels. Hence using a


Mime
View raw message