beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [beam] branch master updated: [BEAM-3719] Adds support for reading side-inputs from SDFs
Date Thu, 01 Mar 2018 21:39:24 GMT
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c6cf20b  [BEAM-3719] Adds support for reading side-inputs from SDFs
c6cf20b is described below

commit c6cf20b6a1edd8cdd708c8511084b7806dbe80b2
Author: Chamikara Jayalath <chamikara@google.com>
AuthorDate: Thu Mar 1 13:39:20 2018 -0800

    [BEAM-3719] Adds support for reading side-inputs from SDFs
---
 sdks/python/apache_beam/runners/common.pxd         |  3 +-
 sdks/python/apache_beam/runners/common.py          | 40 ++++++++++++++++------
 .../runners/direct/sdf_direct_runner.py            | 17 ++++++---
 .../runners/direct/sdf_direct_runner_test.py       | 29 +++++++++++-----
 sdks/python/apache_beam/runners/sdf_common.py      |  6 ++--
 5 files changed, 70 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index fed0c2c..9b0871f 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -51,7 +51,8 @@ cdef class DoFnInvoker(object):
 
   cpdef invoke_process(self, WindowedValue windowed_value,
                        restriction_tracker=*,
-                       OutputProcessor output_processor=*)
+                       OutputProcessor output_processor=*,
+                       additional_args=*, additional_kwargs=*)
   cpdef invoke_start_bundle(self)
   cpdef invoke_finish_bundle(self)
   cpdef invoke_split(self, element, restriction)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 39e5e99..124d7d3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -180,8 +180,14 @@ class DoFnInvoker(object):
         signature: a DoFnSignature for the DoFn being invoked.
         context: Context to be used when invoking the DoFn (deprecated).
         side_inputs: side inputs to be used when invoking th process method.
-        input_args: arguments to be used when invoking the process method
-        input_kwargs: kwargs to be used when invoking the process method.
+        input_args: arguments to be used when invoking the process method. Some
+                    of the arguments given here might be placeholders (for
+                    example for side inputs) that get filled before invoking the
+                    process method.
+        input_kwargs: keyword arguments to be used when invoking the process
+                      method. Some of the keyword arguments given here might be
+                      placeholders (for example for side inputs) that get filled
+                      before invoking the process method.
         process_invocation: If True, this function may return an invoker that
                             performs extra optimizations for invoking process()
                             method efficiently.
@@ -199,7 +205,8 @@ class DoFnInvoker(object):
           signature, context, side_inputs, input_args, input_kwargs)
 
   def invoke_process(self, windowed_value, restriction_tracker=None,
-                     output_processor=None):
+                     output_processor=None,
+                     additional_args=None, additional_kwargs=None):
     """Invokes the DoFn.process() function.
 
     Args:
@@ -207,6 +214,10 @@ class DoFnInvoker(object):
                       process() method should be invoked along with the window
                       the element belongs to.
       output_procesor: if provided given OutputProcessor will be used.
+      additional_args: additional arguments to be passed to the current
+                      `DoFn.process()` invocation, usually as side inputs.
+      additional_kwargs: additional keyword arguments to be passed to the
+                         current `DoFn.process()` invocation.
     """
     raise NotImplementedError
 
@@ -265,7 +276,8 @@ class SimpleInvoker(DoFnInvoker):
     self.process_method = signature.process_method.method_value
 
   def invoke_process(self, windowed_value, restriction_tracker=None,
-                     output_processor=None):
+                     output_processor=None,
+                     additional_args=None, additional_kwargs=None):
     if not output_processor:
       output_processor = self.output_processor
     output_processor.process_outputs(
@@ -351,7 +363,13 @@ class PerWindowInvoker(DoFnInvoker):
     self.kwargs_for_process = input_kwargs
 
   def invoke_process(self, windowed_value, restriction_tracker=None,
-                     output_processor=None):
+                     output_processor=None,
+                     additional_args=None, additional_kwargs=None):
+    if not additional_args:
+      additional_args = []
+    if not additional_kwargs:
+      additional_kwargs = {}
+
     if not output_processor:
       output_processor = self.output_processor
     self.context.set_element(windowed_value)
@@ -359,7 +377,6 @@ class PerWindowInvoker(DoFnInvoker):
     # or if the process accesses the window parameter. We can just call it once
     # otherwise as none of the arguments are changing
 
-    additional_kwargs = {}
     if restriction_tracker:
       restriction_tracker_param = _find_param_with_default(
           self.signature.process_method,
@@ -373,18 +390,21 @@ class PerWindowInvoker(DoFnInvoker):
       for w in windowed_value.windows:
         self._invoke_per_window(
             WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)),
-            additional_kwargs, output_processor)
+            additional_args, additional_kwargs, output_processor)
     else:
       self._invoke_per_window(
-          windowed_value, additional_kwargs, output_processor)
+          windowed_value, additional_args, additional_kwargs, output_processor)
 
   def _invoke_per_window(
-      self, windowed_value, additional_kwargs, output_processor):
+      self, windowed_value, additional_args,
+      additional_kwargs, output_processor):
     if self.has_windowed_inputs:
       window, = windowed_value.windows
+      side_inputs = [si[window] for si in self.side_inputs]
+      side_inputs.extend(additional_args)
       args_for_process, kwargs_for_process = util.insert_values_in_args(
           self.args_for_process, self.kwargs_for_process,
-          [si[window] for si in self.side_inputs])
+          side_inputs)
     else:
       args_for_process, kwargs_for_process = (
           self.args_for_process, self.kwargs_for_process)
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index aa247aa..610664b 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -59,8 +59,15 @@ class ProcessKeyedElementsViaKeyedWorkItems(PTransform):
     self._process_keyed_elements_transform = process_keyed_elements_transform
 
   def expand(self, pcoll):
-    return pcoll | beam.core.GroupByKey() | ProcessElements(
+    process_elements = ProcessElements(
         self._process_keyed_elements_transform)
+    process_elements.args = (
+        self._process_keyed_elements_transform.ptransform_args)
+    process_elements.kwargs = (
+        self._process_keyed_elements_transform.ptransform_kwargs)
+    process_elements.side_inputs = (
+        self._process_keyed_elements_transform.ptransform_side_inputs)
+    return pcoll | beam.core.GroupByKey() | process_elements
 
 
 class ProcessElements(PTransform):
@@ -176,7 +183,7 @@ class ProcessFn(beam.DoFn):
                       SDFProcessElementInvoker)
 
     output_values = self._process_element_invoker.invoke_process_element(
-        self.sdf_invoker, windowed_element, tracker)
+        self.sdf_invoker, windowed_element, tracker, *args, **kwargs)
 
     sdf_result = None
     for output in output_values:
@@ -270,7 +277,8 @@ class SDFProcessElementInvoker(object):
   def test_method(self):
     raise ValueError
 
-  def invoke_process_element(self, sdf_invoker, element, tracker):
+  def invoke_process_element(
+      self, sdf_invoker, element, tracker, *args, **kwargs):
     """Invokes `process()` method of a Splittable `DoFn` for a given element.
 
      Args:
@@ -302,7 +310,8 @@ class SDFProcessElementInvoker(object):
     output_processor = _OutputProcessor()
     Timer(self._max_duration, initiate_checkpoint).start()
     sdf_invoker.invoke_process(
-        element, restriction_tracker=tracker, output_processor=output_processor)
+        element, restriction_tracker=tracker, output_processor=output_processor,
+        additional_args=args, additional_kwargs=kwargs)
 
     assert output_processor.output_iter is not None
     output_count = 0
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
index 7ab6dde..e8ef9b6 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
@@ -22,9 +22,12 @@ import os
 import unittest
 
 import apache_beam as beam
+from apache_beam import Create
 from apache_beam import DoFn
 from apache_beam.io import filebasedsource_test
 from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.pvalue import AsList
+from apache_beam.pvalue import AsSingleton
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -100,10 +103,13 @@ class ExpandStrings(DoFn):
     self._record_window = record_window
 
   def process(
-      self, element, window=beam.DoFn.WindowParam,
-      restriction_tracker=ExpandStringsProvider(), side=None,
+      self, element, side1, side2, side3, window=beam.DoFn.WindowParam,
+      restriction_tracker=ExpandStringsProvider(),
       *args, **kwargs):
-    side = side or []
+    side = []
+    side.extend(side1)
+    side.extend(side2)
+    side.extend(side3)
     assert isinstance(restriction_tracker, OffsetRestrictionTracker)
     side = list(side)
     for i in range(restriction_tracker.start_position(),
@@ -212,7 +218,7 @@ class SDFDirectRunnerTest(unittest.TestCase):
                                           TimestampedValue(('B', t), t)])
                 | beam.WindowInto(SlidingWindows(10, 5),
                                   accumulation_mode=AccumulationMode.DISCARDING)
-                | beam.ParDo(ExpandStrings(record_window=True)))
+                | beam.ParDo(ExpandStrings(record_window=True), [], [], []))
 
       expected_result = [
           'A:1:-5', 'A:1:0', 'A:3:-5', 'A:3:0', 'A:5:0', 'A:5:5', 'A:10:5',
@@ -222,11 +228,18 @@ class SDFDirectRunnerTest(unittest.TestCase):
 
   def test_sdf_with_side_inputs(self):
     with TestPipeline() as p:
+      side1 = p | 'Create1' >> Create(['1', '2'])
+      side2 = p | 'Create2' >> Create(['3', '4'])
+      side3 = p | 'Create3' >> Create(['5'])
       result = (p
-                | 'create_main' >> beam.Create(['1', '3', '5'])
-                | beam.ParDo(ExpandStrings(), side=['1', '3']))
-
-      expected_result = ['1:1', '3:1', '5:1', '1:3', '3:3', '5:3']
+                | 'create_main' >> beam.Create(['a', 'b', 'c'])
+                | beam.ParDo(ExpandStrings(), AsList(side1), AsList(side2),
+                             AsSingleton(side3)))
+
+      expected_result = []
+      for c in ['a', 'b', 'c']:
+        for i in range(5):
+          expected_result.append(c + ':' + str(i+1))
       assert_that(result, equal_to(expected_result))
 
 
diff --git a/sdks/python/apache_beam/runners/sdf_common.py b/sdks/python/apache_beam/runners/sdf_common.py
index a3e1418..5b35544 100644
--- a/sdks/python/apache_beam/runners/sdf_common.py
+++ b/sdks/python/apache_beam/runners/sdf_common.py
@@ -77,7 +77,8 @@ class SplittableParDo(PTransform):
 
     return keyed_elements | ProcessKeyedElements(
         sdf, element_coder, restriction_coder,
-        pcoll.windowing, self._ptransform.args, self._ptransform.kwargs)
+        pcoll.windowing, self._ptransform.args, self._ptransform.kwargs,
+        self._ptransform.side_inputs)
 
 
 class ElementAndRestriction(object):
@@ -153,13 +154,14 @@ class ProcessKeyedElements(PTransform):
 
   def __init__(
       self, sdf, element_coder, restriction_coder, windowing_strategy,
-      ptransform_args, ptransform_kwargs):
+      ptransform_args, ptransform_kwargs, ptransform_side_inputs):
     self.sdf = sdf
     self.element_coder = element_coder
     self.restriction_coder = restriction_coder
     self.windowing_strategy = windowing_strategy
     self.ptransform_args = ptransform_args
     self.ptransform_kwargs = ptransform_kwargs
+    self.ptransform_side_inputs = ptransform_side_inputs
 
   def expand(self, pcoll):
     return pvalue.PCollection(pcoll.pipeline)

-- 
To stop receiving notification emails like this one, please contact
chamikara@apache.org.

Mime
View raw message