beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [beam] branch master updated: Update spec on CombineFn and DoFn to clarify mutability of parameters
Date Mon, 01 Apr 2019 21:10:42 GMT
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5caf0f2  Update spec on CombineFn and DoFn to clarify mutability of parameters
     new 08a5fdf  Merge #8134: [BEAM-6906] Update spec on CombineFn and DoFn to clarify mutability
of parameters
5caf0f2 is described below

commit 5caf0f2f43ddb8185c602807ea19fc182ebded40
Author: Yueyang Qiu <robinyqiu@gmail.com>
AuthorDate: Mon Mar 25 14:17:10 2019 -0700

    Update spec on CombineFn and DoFn to clarify mutability of parameters
---
 .../org/apache/beam/sdk/transforms/Combine.java    | 10 ++++----
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  1 +
 sdks/python/apache_beam/transforms/core.py         | 27 ++++++++++++++--------
 3 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2c54466..54e6f08 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -341,16 +341,18 @@ public class Combine {
     /**
      * Adds the given input value to the given accumulator, returning the new accumulator
value.
      *
-     * <p>For efficiency, the input accumulator may be modified and returned.
+     * @param mutableAccumulator may be modified and returned for efficiency
+     * @param input should not be mutated
      */
-    public abstract AccumT addInput(AccumT accumulator, InputT input);
+    public abstract AccumT addInput(AccumT mutableAccumulator, InputT input);
 
     /**
      * Returns an accumulator representing the accumulation of all the input values accumulated
in
      * the merging accumulators.
      *
-     * <p>May modify any of the argument accumulators. May return a fresh accumulator,
or may return
-     * one of the (modified) argument accumulators.
+     * @param accumulators only the first accumulator may be modified and returned for efficiency;
+     *     the other accumulators should not be mutated, because they may be shared with
other code
+     *     and mutating them could lead to incorrect results or data corruption.
      */
     public abstract AccumT mergeAccumulators(Iterable<AccumT> accumulators);
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index e3c0604..825966a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -232,6 +232,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable,
HasDisplayD
      * Returns the input element to be processed.
      *
      * <p>The element will not be changed -- it is safe to cache, etc. without copying.
+     * Implementation of {@link DoFn.ProcessElement} method should not mutate the element.
      */
     public abstract InputT element();
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 1d095e8..a445152 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -398,7 +398,7 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     If specified, following default arguments are used by the ``DoFnRunner`` to
     be able to pass the parameters correctly.
 
-    ``DoFn.ElementParam``: element to be processed.
+    ``DoFn.ElementParam``: element to be processed, should not be mutated.
     ``DoFn.SideInputParam``: a side input that may be used when processing.
     ``DoFn.TimestampParam``: timestamp of the input element.
     ``DoFn.WindowParam``: ``Window`` the input element belongs to.
@@ -597,20 +597,21 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     """
     raise NotImplementedError(str(self))
 
-  def add_input(self, accumulator, element, *args, **kwargs):
+  def add_input(self, mutable_accumulator, element, *args, **kwargs):
     """Return result of folding element into accumulator.
 
     CombineFn implementors must override add_input.
 
     Args:
-      accumulator: the current accumulator
-      element: the element to add
+      mutable_accumulator: the current accumulator,
+        may be modified and returned for efficiency
+      element: the element to add, should not be mutated
       *args: Additional arguments and side inputs.
       **kwargs: Additional arguments and side inputs.
     """
     raise NotImplementedError(str(self))
 
-  def add_inputs(self, accumulator, elements, *args, **kwargs):
+  def add_inputs(self, mutable_accumulator, elements, *args, **kwargs):
     """Returns the result of folding each element in elements into accumulator.
 
     This is provided in case the implementation affords more efficient
@@ -618,21 +619,27 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     over the inputs invoking add_input for each one.
 
     Args:
-      accumulator: the current accumulator
-      elements: the elements to add
+      mutable_accumulator: the current accumulator,
+        may be modified and returned for efficiency
+      elements: the elements to add, should not be mutated
       *args: Additional arguments and side inputs.
       **kwargs: Additional arguments and side inputs.
     """
     for element in elements:
-      accumulator = self.add_input(accumulator, element, *args, **kwargs)
-    return accumulator
+      mutable_accumulator =\
+        self.add_input(mutable_accumulator, element, *args, **kwargs)
+    return mutable_accumulator
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
     """Returns the result of merging several accumulators
     to a single accumulator value.
 
     Args:
-      accumulators: the accumulators to merge
+      accumulators: the accumulators to merge.
+        Only the first accumulator may be modified and returned for efficiency;
+        the other accumulators should not be mutated, because they may be
+        shared with other code and mutating them could lead to incorrect
+        results or data corruption.
       *args: Additional arguments and side inputs.
       **kwargs: Additional arguments and side inputs.
     """


Mime
View raw message