spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mark Hamstra (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-6416) RDD.fold() requires the operator to be commutative
Date Fri, 01 Jan 2016 19:23:39 GMT

    [ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15076367#comment-15076367
] 

Mark Hamstra commented on SPARK-6416:
-------------------------------------

I don't see any reason to change the API wrt `fold`.  With operations on RDDs, we generally
try to achieve the same semantics as for Scala parallel collections, and that does hold true
for `fold`:

  scala> val list = (1 to 10000).toList

  scala> list.fold(0)(_ + _)
  res0: Int = 50005000

  scala> list.par.fold(0)(_ + _)
  res1: Int = 50005000

  scala> list.fold(1)(_ + _)
  res2: Int = 50005001

  scala> list.par.fold(1)(_ + _)
  res3: Int = 50005039


If we need to change anything, it would simply be to change our API documentation to more
closely match that of the Scala Standard Library, where the first argument to `fold` is described
as: "a neutral element for the fold operation, it may be added to the result an arbitrary
number of times, not changing the result (e.g. Nil for list concatenation, 0 for addition,
or 1 for multiplication)".

> RDD.fold() requires the operator to be commutative
> --------------------------------------------------
>
>                 Key: SPARK-6416
>                 URL: https://issues.apache.org/jira/browse/SPARK-6416
>             Project: Spark
>          Issue Type: Bug
>          Components: Documentation, Spark Core
>            Reporter: Josh Rosen
>            Priority: Critical
>
> Spark's {{RDD.fold}} operation has some confusing behaviors when a non-commutative reduce
function is used.
> Here's an example, which was originally reported on StackOverflow (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output):
> {code}
> sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
> 8
> {code}
> To understand what's going on here, let's look at the definition of Spark's `fold` operation.
 
> I'm going to show the Python version of the code, but the Scala version exhibits the
exact same behavior (you can also [browse the source on GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]:
> {code}
>     def fold(self, zeroValue, op):
>         """
>         Aggregate the elements of each partition, and then the results for all
>         the partitions, using a given associative function and a neutral "zero
>         value."
>         The function C{op(t1, t2)} is allowed to modify C{t1} and return it
>         as its result value to avoid object allocation; however, it should not
>         modify C{t2}.
>         >>> from operator import add
>         >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
>         15
>         """
>         def func(iterator):
>             acc = zeroValue
>             for obj in iterator:
>                 acc = op(obj, acc)
>             yield acc
>         vals = self.mapPartitions(func).collect()
>         return reduce(op, vals, zeroValue)
> {code}
> (For comparison, see the [Scala implementation of `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]).
> Spark's `fold` operates by first folding each partition and then folding the results.
 The problem is that an empty partition gets folded down to the zero element, so the final
driver-side fold ends up folding one value for _every_ partition rather than one value for
each _non-empty_ partition.  This means that the result of `fold` is sensitive to the number
of partitions:
> {code}
>     >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
>     100
>     >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
>     50
>     >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
>     1
> {code}
> In this last case, what's happening is that the single partition is being folded down
to the correct value, then that value is folded with the zero-value at the driver to yield
1.
> I think the underlying problem here is that our fold() operation implicitly requires
the operator to be commutative in addition to associative, but this isn't documented anywhere.
 Due to ordering non-determinism elsewhere in Spark, such as SPARK-5750, I don't think there's
an easy way to fix this.  Therefore, I think we should update the documentation and examples
to clarify this requirement and explain that our fold acts more like a reduce with a default
value than the type of ordering-sensitive fold() that users may expect in functional languages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message