beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3884) Python SDK supports Impulse as a primitive transform
Date Thu, 20 Sep 2018 07:42:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3884?focusedWorklogId=145944&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145944
]

ASF GitHub Bot logged work on BEAM-3884:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Sep/18 07:41
            Start Date: 20/Sep/18 07:41
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6376: [BEAM-3884] Implement Impulse
primitive in FnApiRunner.
URL: https://github.com/apache/beam/pull/6376
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/portability/python_urns.py b/sdks/python/apache_beam/portability/python_urns.py
index fd8dbbaa9bb..9f22313ec53 100644
--- a/sdks/python/apache_beam/portability/python_urns.py
+++ b/sdks/python/apache_beam/portability/python_urns.py
@@ -29,4 +29,6 @@
 PICKLED_WINDOWFN = "beam:windowfn:pickled_python:v1"
 PICKLED_VIEWFN = "beam:view_fn:pickled_python_data:v1"
 
+IMPULSE_READ_TRANSFORM = "beam:transform:read_from_impulse_python:v1"
+
 GENERIC_COMPOSITE_TRANSFORM = "beam:transform:generic_composite:v1"
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 593c6e3da21..31091313623 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -43,6 +43,7 @@
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.portability import common_urns
+from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -59,6 +60,13 @@
 standard_library.install_aliases()
 # This module is experimental. No backwards-compatibility guarantees.
 
+ENCODED_IMPULSE_VALUE = beam.coders.WindowedValueCoder(
+    beam.coders.BytesCoder(),
+    beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
+        beam.transforms.window.GlobalWindows.windowed_value(''))
+
+IMPULSE_BUFFER_PREFIX = 'impulse:'
+
 
 class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
 
@@ -309,7 +317,23 @@ def deduplicate_read(self):
           new_transforms.append(transform)
         self.transforms = new_transforms
 
-    # Now define the "optimization" phases.
+    # Some helper functions.
+
+    def add_or_get_coder_id(coder_proto):
+      for coder_id, coder in pipeline_components.coders.items():
+        if coder == coder_proto:
+          return coder_id
+      new_coder_id = unique_name(pipeline_components.coders, 'coder')
+      pipeline_components.coders[new_coder_id].CopyFrom(coder_proto)
+      return new_coder_id
+
+    def windowed_coder_id(coder_id, window_coder_id):
+      proto = beam_runner_api_pb2.Coder(
+          spec=beam_runner_api_pb2.SdkFunctionSpec(
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.coders.WINDOWED_VALUE.urn)),
+          component_coder_ids=[coder_id, window_coder_id])
+      return add_or_get_coder_id(proto)
 
     safe_coders = {}
 
@@ -380,6 +404,63 @@ def wrap_unknown_coders(coder_id, with_bytes):
       safe_coders[new_coder_id] = wrap_unknown_coders(pcoll.coder_id, True)
       pcoll.coder_id = new_coder_id
 
+    # Now define the "optimization" phases.
+
+    def impulse_to_input(stages):
+      bytes_coder_id = add_or_get_coder_id(
+          beam.coders.BytesCoder().to_runner_api(None))
+      global_window_coder_id = add_or_get_coder_id(
+          beam.coders.coders.GlobalWindowCoder().to_runner_api(None))
+      globally_windowed_bytes_coder_id = windowed_coder_id(
+          bytes_coder_id, global_window_coder_id)
+
+      for stage in stages:
+        # First map Reads, if any, to Impulse + triggered read op.
+        for transform in list(stage.transforms):
+          if transform.spec.urn == common_urns.deprecated_primitives.READ.urn:
+            read_pc = only_element(transform.outputs.values())
+            read_pc_proto = pipeline_components.pcollections[read_pc]
+            impulse_pc = unique_name(
+                pipeline_components.pcollections, 'Impulse')
+            pipeline_components.pcollections[impulse_pc].CopyFrom(
+                beam_runner_api_pb2.PCollection(
+                    unique_name=impulse_pc,
+                    coder_id=globally_windowed_bytes_coder_id,
+                    windowing_strategy_id=read_pc_proto.windowing_strategy_id,
+                    is_bounded=read_pc_proto.is_bounded))
+            stage.transforms.remove(transform)
+            # TODO(robertwb): If this goes multi-process before fn-api
+            # read is default, expand into split + reshuffle + read.
+            stage.transforms.append(
+                beam_runner_api_pb2.PTransform(
+                    unique_name=transform.unique_name + '/Impulse',
+                    spec=beam_runner_api_pb2.FunctionSpec(
+                        urn=common_urns.primitives.IMPULSE.urn),
+                    outputs={'out': impulse_pc}))
+            stage.transforms.append(
+                beam_runner_api_pb2.PTransform(
+                    unique_name=transform.unique_name,
+                    spec=beam_runner_api_pb2.FunctionSpec(
+                        urn=python_urns.IMPULSE_READ_TRANSFORM,
+                        payload=transform.spec.payload),
+                    inputs={'in': impulse_pc},
+                    outputs={'out': read_pc}))
+
+        # Now map impulses to inputs.
+        for transform in list(stage.transforms):
+          if transform.spec.urn == common_urns.primitives.IMPULSE.urn:
+            stage.transforms.remove(transform)
+            impulse_pc = only_element(transform.outputs.values())
+            stage.transforms.append(
+                beam_runner_api_pb2.PTransform(
+                    unique_name=transform.unique_name,
+                    spec=beam_runner_api_pb2.FunctionSpec(
+                        urn=bundle_processor.DATA_INPUT_URN,
+                        payload=str(IMPULSE_BUFFER_PREFIX + impulse_pc)),
+                    outputs=transform.outputs))
+
+        yield stage
+
     def lift_combiners(stages):
       """Expands CombinePerKey into pre- and post-grouping stages.
 
@@ -389,22 +470,6 @@ def lift_combiners(stages):
 
       ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ...
       """
-      def add_or_get_coder_id(coder_proto):
-        for coder_id, coder in pipeline_components.coders.items():
-          if coder == coder_proto:
-            return coder_id
-        new_coder_id = unique_name(pipeline_components.coders, 'coder')
-        pipeline_components.coders[new_coder_id].CopyFrom(coder_proto)
-        return new_coder_id
-
-      def windowed_coder_id(coder_id):
-        proto = beam_runner_api_pb2.Coder(
-            spec=beam_runner_api_pb2.SdkFunctionSpec(
-                spec=beam_runner_api_pb2.FunctionSpec(
-                    urn=common_urns.coders.WINDOWED_VALUE.urn)),
-            component_coder_ids=[coder_id, window_coder_id])
-        return add_or_get_coder_id(proto)
-
       for stage in stages:
         assert len(stage.transforms) == 1
         transform = stage.transforms[0]
@@ -453,7 +518,8 @@ def windowed_coder_id(coder_id):
           pipeline_components.pcollections[precombined_pcoll_id].CopyFrom(
               beam_runner_api_pb2.PCollection(
                   unique_name=transform.unique_name + '/Precombine.out',
-                  coder_id=windowed_coder_id(key_accumulator_coder_id),
+                  coder_id=windowed_coder_id(
+                      key_accumulator_coder_id, window_coder_id),
                   windowing_strategy_id=input_pcoll.windowing_strategy_id,
                   is_bounded=input_pcoll.is_bounded))
 
@@ -462,7 +528,8 @@ def windowed_coder_id(coder_id):
           pipeline_components.pcollections[grouped_pcoll_id].CopyFrom(
               beam_runner_api_pb2.PCollection(
                   unique_name=transform.unique_name + '/Group.out',
-                  coder_id=windowed_coder_id(key_accumulator_iter_coder_id),
+                  coder_id=windowed_coder_id(
+                      key_accumulator_iter_coder_id, window_coder_id),
                   windowing_strategy_id=output_pcoll.windowing_strategy_id,
                   is_bounded=output_pcoll.is_bounded))
 
@@ -471,7 +538,8 @@ def windowed_coder_id(coder_id):
           pipeline_components.pcollections[merged_pcoll_id].CopyFrom(
               beam_runner_api_pb2.PCollection(
                   unique_name=transform.unique_name + '/Merge.out',
-                  coder_id=windowed_coder_id(key_accumulator_coder_id),
+                  coder_id=windowed_coder_id(
+                      key_accumulator_coder_id, window_coder_id),
                   windowing_strategy_id=output_pcoll.windowing_strategy_id,
                   is_bounded=output_pcoll.is_bounded))
 
@@ -840,7 +908,8 @@ def leaf_transforms(root_ids):
     # Apply each phase in order.
     for phase in [
         annotate_downstream_side_inputs, fix_side_input_pcoll_coders,
-        lift_combiners, expand_gbk, sink_flattens, greedily_fuse, sort_stages]:
+        lift_combiners, expand_gbk, sink_flattens, greedily_fuse,
+        impulse_to_input, sort_stages]:
       logging.info('%s %s %s', '=' * 20, phase, '=' * 20)
       stages = list(phase(stages))
       logging.debug('Stages: %s', [str(s) for s in stages])
@@ -885,7 +954,10 @@ def extract_endpoints(stage):
           pcoll_id = transform.spec.payload
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
             target = transform.unique_name, only_element(transform.outputs)
-            data_input[target] = pcoll_buffers[pcoll_id]
+            if pcoll_id.startswith(IMPULSE_BUFFER_PREFIX):
+              data_input[target] = [ENCODED_IMPULSE_VALUE]
+            else:
+              data_input[target] = pcoll_buffers[pcoll_id]
             coder_id = pipeline_components.pcollections[
                 only_element(transform.outputs.values())].coder_id
           elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 277a817dd2a..91de6d2a5d8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -386,7 +386,7 @@ def test_progress_metrics(self):
       self.assertEqual(
           4,
           pregbk_metrics.ptransforms['Create/Read']
-          .processed_elements.measured.output_element_counts['None'])
+          .processed_elements.measured.output_element_counts['out'])
       self.assertEqual(
           4,
           pregbk_metrics.ptransforms['Map(sleep)']
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 33c8744a7b4..1e6dd4d6ef7 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -483,6 +483,19 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
       consumers)
 
 
+@BeamTransformFactory.register_urn(
+    python_urns.IMPULSE_READ_TRANSFORM, beam_runner_api_pb2.ReadPayload)
+def create(factory, transform_id, transform_proto, parameter, consumers):
+  return operations.ImpulseReadOperation(
+      transform_proto.unique_name,
+      factory.counter_factory,
+      factory.state_sampler,
+      consumers,
+      iobase.SourceBase.from_runner_api(
+          parameter.source, factory.context),
+      factory.get_only_output_coder(transform_proto))
+
+
 @BeamTransformFactory.register_urn(OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN, None)
 def create(factory, transform_id, transform_proto, serialized_fn, consumers):
   return _create_pardo_operation(
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index f27de8d0170..8c2b9732623 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -70,6 +70,12 @@ cdef class ReadOperation(Operation):
   cpdef start(self)
 
 
+cdef class ImpulseReadOperation(Operation):
+  cdef object source
+  @cython.locals(windowed_value=WindowedValue)
+  cpdef process(self, WindowedValue impulse)
+
+
 cdef class DoOperation(Operation):
   cdef object dofn_runner
   cdef Receiver dofn_receiver
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 0488fe928d3..ff1f11052ad 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -245,6 +245,29 @@ def start(self):
         self.output(windowed_value)
 
 
+class ImpulseReadOperation(Operation):
+
+  def __init__(self, name_context, counter_factory, state_sampler,
+               consumers, source, output_coder):
+    super(ImpulseReadOperation, self).__init__(
+        name_context, None, counter_factory, state_sampler)
+    self.source = source
+    self.receivers = [
+        ConsumerSet(
+            self.counter_factory, self.name_context.step_name, 0,
+            next(iter(consumers.values())), output_coder)]
+
+  def process(self, unused_impulse):
+    with self.scoped_process_state:
+      range_tracker = self.source.get_range_tracker(None, None)
+      for value in self.source.read(range_tracker):
+        if isinstance(value, WindowedValue):
+          windowed_value = value
+        else:
+          windowed_value = _globally_windowed_value.with_value(value)
+        self.output(windowed_value)
+
+
 class InMemoryWriteOperation(Operation):
   """A write operation that will write to an in-memory sink."""
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145944)
    Time Spent: 3.5h  (was: 3h 20m)

> Python SDK supports Impulse as a primitive transform
> ----------------------------------------------------
>
>                 Key: BEAM-3884
>                 URL: https://issues.apache.org/jira/browse/BEAM-3884
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Portable runners require Impulse to be the only root nodes of pipelines. The Python SDK
should provide this for pipeline construction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message