spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <>
Subject [jira] [Resolved] (SPARK-6416) RDD.fold() requires the operator to be commutative
Date Sun, 03 Jan 2016 08:55:39 GMT


Sean Owen resolved SPARK-6416.
          Resolution: Fixed
            Assignee: Sean Owen
       Fix Version/s: 1.5.0
    Target Version/s:   (was: 2.0.0)

Hm! I should have tried that myself. I think that's a good argument that at least it's not
inconsistent. The behavior is documented by an earlier change so calling this resolved, retroactively.

> RDD.fold() requires the operator to be commutative
> --------------------------------------------------
>                 Key: SPARK-6416
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Documentation, Spark Core
>    Affects Versions: 1.4.0
>            Reporter: Josh Rosen
>            Assignee: Sean Owen
>            Priority: Critical
>             Fix For: 1.5.0
> Spark's {{RDD.fold}} operation has some confusing behaviors when a non-commutative reduce
function is used.
> Here's an example, which was originally reported on StackOverflow (
> {code}
> sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
> 8
> {code}
> To understand what's going on here, let's look at the definition of Spark's `fold` operation.
> I'm going to show the Python version of the code, but the Scala version exhibits the
exact same behavior (you can also [browse the source on GitHub|]:
> {code}
>     def fold(self, zeroValue, op):
>         """
>         Aggregate the elements of each partition, and then the results for all
>         the partitions, using a given associative function and a neutral "zero
>         value."
>         The function C{op(t1, t2)} is allowed to modify C{t1} and return it
>         as its result value to avoid object allocation; however, it should not
>         modify C{t2}.
>         >>> from operator import add
>         >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
>         15
>         """
>         def func(iterator):
>             acc = zeroValue
>             for obj in iterator:
>                 acc = op(obj, acc)
>             yield acc
>         vals = self.mapPartitions(func).collect()
>         return reduce(op, vals, zeroValue)
> {code}
> (For comparison, see the [Scala implementation of `RDD.fold`|]).
> Spark's `fold` operates by first folding each partition and then folding the results.
 The problem is that an empty partition gets folded down to the zero element, so the final
driver-side fold ends up folding one value for _every_ partition rather than one value for
each _non-empty_ partition.  This means that the result of `fold` is sensitive to the number
of partitions:
> {code}
>     >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
>     100
>     >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
>     50
>     >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
>     1
> {code}
> In this last case, what's happening is that the single partition is being folded down
to the correct value, then that value is folded with the zero-value at the driver to yield
> I think the underlying problem here is that our fold() operation implicitly requires
the operator to be commutative in addition to associative, but this isn't documented anywhere.
 Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's
an easy way to fix this.  Therefore, I think we should update the documentation and examples
to clarify this requirement and explain that our fold acts more like a reduce with a default
value than the type of ordering-sensitive fold() that users may expect in functional languages.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message