beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
Date Tue, 28 Nov 2017 23:23:02 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16269685#comment-16269685
] 

ASF GitHub Bot commented on BEAM-1872:
--------------------------------------

udim commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for use
in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153653669
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/util.py
 ##########
 @@ -423,3 +431,115 @@ def expand(self, pcoll):
           self._batch_size_estimator))
     else:
       return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, coder):
+    """Create a new WindowFn with compatible coder.
+    To be applied to PCollections with windows that are compatible with the
+    given coder.
+
+    Arguments:
+      coder: coders.Coder object to be used on windows.
+    """
+    super(IdentityWindowFn, self).__init__()
+    if coder is None:
+      raise ValueError('coder should not be None')
+    self._coder = coder
+
+  def assign(self, assign_context):
+    if assign_context.window is None:
+      raise ValueError(
+          'assign_context.window should not be None. '
+          'This might be due to a DoFn returning a TimestampedValue.')
+    return [assign_context.window]
+
+  def get_window_coder(self):
+    return self._coder
+
+class TriggerForEveryElement(TriggerFn):
+
+  def __repr__(self):
+    return 'TriggerForEveryElement'
+
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def on_element(self, element, window, context):
+    pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+    # doesn't merge
+    pass
+
+  def should_fire(self, watermark, window, context):
+    return True
+
+  def on_fire(self, watermark, window, context):
+    return True
+
+  def reset(self, window, context):
+    pass
+
+  @staticmethod
+  def from_runner_api(unused_proto, unused_context):
+    return TriggerForEveryElement()
+
+  def to_runner_api(self, unused_context):
+    # TODO: add TriggerForEveryElement to proto
+    return beam_runner_api_pb2.Trigger(
+        element_count=beam_runner_api_pb2.Trigger.ElementCount(
+            element_count=0))
+
+
+# TODO(ehudm): compare with Java implementation for more edge cases.
+# TODO: are these typehints necessary?
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class Reshuffle(PTransform):
+  """TODO description
+
+  Reshuffle is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+    class ExpandIterableDoFn(DoFn):
+      def process(self, element):
+        return [(element[0], value) for value in element[1]]
+
+    class ReifyTimestampsIn(DoFn):
+      def process(self, element, timestamp=DoFn.TimestampParam):
+        if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+                timestamp == DoFn.TimestampParam):
+          raise ValueError('timestamp was unset for element: %r' % element)
+        yield element[0], TimestampedValue(element[1], timestamp)
+
+    class ReifyTimestampsExtract(DoFn):
+      def process(self, element, window=DoFn.WindowParam):
+        yield windowed_value.WindowedValue(
+            (element[0], element[1].value), element[1].timestamp, [window])
+
+    # TODO: is it safe to reapply this value?
+    windowing_saved = pcoll.windowing
+    # TODO: add .with_input_types, .with_output_types to PTransforms below?
 
 Review comment:
   <!--thread_id:cc_151219520_t; commit:4fa4caa4b4991fe994ce3938f37be975421c6761; resolved:1-->
   <!--section:context-quote-->
   > **robertwb** wrote:
   > It should be able to infer.
   
   <!--section:body-->
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> ---------------------------------------------------------------------
>
>                 Key: BEAM-1872
>                 URL: https://issues.apache.org/jira/browse/BEAM-1872
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Udi Meiri
>              Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message