beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: BEAM-7141: add key value timer callback (#8739)
Date Tue, 25 Jun 2019 01:59:20 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fb8efe3  BEAM-7141: add key value timer callback (#8739)
fb8efe3 is described below

commit fb8efe3af66ddfe3ce7d4020c43d10d8467f0ab9
Author: Rakesh Kumar <rakeshcusat@gmail.com>
AuthorDate: Mon Jun 24 18:59:05 2019 -0700

    BEAM-7141: add key value timer callback (#8739)
    
    * BEAM-7141: Add key parameter in timer callback
    
    Why?
      Key parameter was missing in the timer callback
      so it makes the debugging harder.
---
 sdks/python/apache_beam/runners/common.pxd         |  2 +
 sdks/python/apache_beam/runners/common.py          | 27 ++++++++--
 sdks/python/apache_beam/runners/common_test.py     | 59 ++++++++++++++++++++++
 sdks/python/apache_beam/transforms/core.py         | 13 +++--
 .../apache_beam/transforms/userstate_test.py       | 11 ++--
 5 files changed, 99 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index b901c68..ebbd1bb 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -39,6 +39,7 @@ cdef class MethodWrapper(object):
   cdef object timer_args_to_replace
   cdef object timestamp_arg_name
   cdef object window_arg_name
+  cdef object key_arg_name
 
 
 cdef class DoFnSignature(object):
@@ -90,6 +91,7 @@ cdef class PerWindowInvoker(DoFnInvoker):
   cdef bint is_splittable
   cdef object restriction_tracker
   cdef WindowedValue current_windowed_value
+  cdef bint is_key_param_required
 
 
 cdef class DoFnRunner(Receiver):
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 6d821e3..89c2cb7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -171,6 +171,7 @@ class MethodWrapper(object):
     self.timer_args_to_replace = {}
     self.timestamp_arg_name = None
     self.window_arg_name = None
+    self.key_arg_name = None
 
     for kw, v in zip(args[-len(defaults):], defaults):
       if isinstance(v, core.DoFn.StateParam):
@@ -183,6 +184,8 @@ class MethodWrapper(object):
         self.timestamp_arg_name = kw
       elif v == core.DoFn.WindowParam:
         self.window_arg_name = kw
+      elif v == core.DoFn.KeyParam:
+        self.key_arg_name = kw
 
   def invoke_timer_callback(self,
                             user_state_context,
@@ -201,6 +204,8 @@ class MethodWrapper(object):
       kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
     if self.window_arg_name:
       kwargs[self.window_arg_name] = window
+    if self.key_arg_name:
+      kwargs[self.key_arg_name] = key
 
     if kwargs:
       return self.method_value(**kwargs)
@@ -310,6 +315,13 @@ class DoFnInvoker(object):
   represented by a given DoFnSignature."""
 
   def __init__(self, output_processor, signature):
+    """
+    Initializes `DoFnInvoker`
+
+    :param output_processor: an OutputProcessor for receiving elements produced
+                             by invoking functions of the DoFn.
+    :param signature: a DoFnSignature for the DoFn being invoked
+    """
     self.output_processor = output_processor
     self.signature = signature
     self.user_state_context = None
@@ -474,6 +486,7 @@ class PerWindowInvoker(DoFnInvoker):
     self.restriction_tracker = None
     self.current_windowed_value = None
     self.bundle_finalizer_param = bundle_finalizer_param
+    self.is_key_param_required = False
 
     # Try to prepare all the arguments that can just be filled in
     # without any additional work. in the process function.
@@ -504,11 +517,14 @@ class PerWindowInvoker(DoFnInvoker):
       args_to_pick = len(arguments) - len(defaults) - self_in_args
       args_with_placeholders = input_args[:args_to_pick]
 
-    # Fill the OtherPlaceholders for context, window or timestamp
+    # Fill the OtherPlaceholders for context, key, window or timestamp
     remaining_args_iter = iter(input_args[args_to_pick:])
     for a, d in zip(arguments[-len(defaults):], defaults):
       if d == core.DoFn.ElementParam:
         args_with_placeholders.append(ArgPlaceholder(d))
+      elif d == core.DoFn.KeyParam:
+        self.is_key_param_required = True
+        args_with_placeholders.append(ArgPlaceholder(d))
       elif d == core.DoFn.WindowParam:
         args_with_placeholders.append(ArgPlaceholder(d))
       elif d == core.DoFn.TimestampParam:
@@ -625,18 +641,19 @@ class PerWindowInvoker(DoFnInvoker):
     # stateful DoFn, we set during __init__ self.has_windowed_inputs to be
     # True. Therefore, windows will be exploded coming into this method, and
     # we can rely on the window variable being set above.
-    if self.user_state_context:
+    if self.user_state_context or self.is_key_param_required:
       try:
         key, unused_value = windowed_value.value
       except (TypeError, ValueError):
         raise ValueError(
-            ('Input value to a stateful DoFn must be a KV tuple; instead, '
-             'got %s.') % (windowed_value.value,))
+            ('Input value to a stateful DoFn or KeyParam must be a KV tuple; '
+             'instead, got \'%s\'.') % (windowed_value.value,))
 
-    # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
     for i, p in self.placeholders:
       if p == core.DoFn.ElementParam:
         args_for_process[i] = windowed_value.value
+      elif p == core.DoFn.KeyParam:
+        args_for_process[i] = key
       elif p == core.DoFn.WindowParam:
         args_for_process[i] = window
       elif p == core.DoFn.TimestampParam:
diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py
index 18e2c45..9377708 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -19,7 +19,13 @@ from __future__ import absolute_import
 
 import unittest
 
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.common import DoFnSignature
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
 from apache_beam.transforms.core import DoFn
 
 
@@ -56,5 +62,58 @@ class DoFnSignatureTest(unittest.TestCase):
       DoFnSignature(MyDoFn())
 
 
+class DoFnProcessTest(unittest.TestCase):
+  # pylint: disable=expression-not-assigned
+  all_records = None
+
+  def setUp(self):
+    DoFnProcessTest.all_records = []
+
+  def record_dofn(self):
+    class RecordDoFn(DoFn):
+      def process(self, element):
+        DoFnProcessTest.all_records.append(element)
+
+    return RecordDoFn()
+
+  def test_dofn_process_keyparam(self):
+
+    class DoFnProcessWithKeyparam(DoFn):
+
+      def process(self, element, mykey=DoFn.KeyParam):
+        yield "{key}-verify".format(key=mykey)
+
+    pipeline_options = PipelineOptions()
+
+    with TestPipeline(options=pipeline_options) as p:
+      test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
+      (p
+       | test_stream
+       | beam.Map(lambda x: (x, "some-value"))
+       | "window_into" >> beam.WindowInto(
+           window.FixedWindows(5),
+           accumulation_mode=trigger.AccumulationMode.DISCARDING)
+       | beam.ParDo(DoFnProcessWithKeyparam())
+       | beam.ParDo(self.record_dofn()))
+
+    self.assertEqual(
+        ['1-verify', '2-verify'],
+        sorted(DoFnProcessTest.all_records))
+
+  def test_dofn_process_keyparam_error_no_key(self):
+    class DoFnProcessWithKeyparam(DoFn):
+
+      def process(self, element, mykey=DoFn.KeyParam):
+        yield "{key}-verify".format(key=mykey)
+
+    pipeline_options = PipelineOptions()
+    with self.assertRaises(ValueError),\
+         TestPipeline(options=pipeline_options) as p:
+      test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
+      (p
+       | test_stream
+       | beam.ParDo(DoFnProcessWithKeyparam()))
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index ead094b..f51dc0f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -423,17 +423,19 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   PaneInfoParam = _DoFnParam('PaneInfoParam')
   WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
   BundleFinalizerParam = _BundleFinalizerParam
-
-  DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
-                       WindowParam, WatermarkReporterParam, PaneInfoParam,
-                       BundleFinalizerParam]
+  KeyParam = _DoFnParam('KeyParam')
 
   # Parameters to access state and timers.  Not restricted to use only in the
   # .process() method. Usage: DoFn.StateParam(state_spec),
-  # DoFn.TimerParam(timer_spec).
+  # DoFn.TimerParam(timer_spec), DoFn.TimestampParam, DoFn.WindowParam,
+  # DoFn.KeyParam
   StateParam = _StateDoFnParam
   TimerParam = _TimerDoFnParam
 
+  DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
+                       WindowParam, WatermarkReporterParam, PaneInfoParam,
+                       BundleFinalizerParam, KeyParam, StateParam, TimerParam]
+
   RestrictionParam = _RestrictionDoFnParam
 
   @staticmethod
@@ -460,6 +462,7 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     of the parameter.
     ``DoFn.StateParam``: a ``userstate.RuntimeState`` object defined by the spec
     of the parameter.
+    ``DoFn.KeyParam``: key associated with the element.
     ``DoFn.RestrictionParam``: an ``iobase.RestrictionTracker`` will be
     provided here to allow treatment as a Splittable ``DoFn``. The restriction
     tracker will be derived from the restriction provider in the parameter.
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 7a05c73..0d98337 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -66,6 +66,9 @@ class TestStatefulDoFn(DoFn):
 
   @on_timer(EXPIRY_TIMER_1)
   def on_expiry_1(self,
+                  window=DoFn.WindowParam,
+                  timestamp=DoFn.TimestampParam,
+                  key=DoFn.KeyParam,
                   buffer=DoFn.StateParam(BUFFER_STATE_1),
                   timer_1=DoFn.TimerParam(EXPIRY_TIMER_1),
                   timer_2=DoFn.TimerParam(EXPIRY_TIMER_2),
@@ -571,8 +574,10 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
       @on_timer(EMIT_TIMER_1)
       def emit_callback_1(self,
                           window=DoFn.WindowParam,
-                          ts=DoFn.TimestampParam):
-        yield ('timer1', int(ts), int(window.start), int(window.end))
+                          ts=DoFn.TimestampParam,
+                          key=DoFn.KeyParam):
+        yield ('timer1-{key}'.format(key=key),
+               int(ts), int(window.start), int(window.end))
 
     pipeline_options = PipelineOptions()
     with TestPipeline(options=pipeline_options) as p:
@@ -589,7 +594,7 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
        | beam.ParDo(self.record_dofn()))
 
     self.assertEqual(
-        [('timer1', 10, 10, 15)],
+        [('timer1-mykey', 10, 10, 15)],
         sorted(StatefulDoFnOnDirectRunnerTest.all_records))
 
   def test_index_assignment(self):


Mime
View raw message