spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Or <and...@databricks.com>
Subject Re: Aggregate order semantics when spilling
Date Tue, 20 Jan 2015 19:03:32 GMT
Hi Justin,

I believe the intended semantics of groupByKey or cogroup is that the
ordering *within a key *is not preserved if you spill. In fact, the test
cases for the ExternalAppendOnlyMap only assert that the Set representation
of the results is as expected (see this line
<https://github.com/apache/spark/blob/9d9294aebf7208a76f43d8fc5a0489a83d7215f4/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala#L265>).
This is because these Spark primitives literally just group the values by a
key but does not provide any ordering guarantees.

However, if ordering within a key is a requirement for your application,
then you may need to write your own PairRDDFunction that calls
combineByKey. You can model your method after groupByKey, but change the
combiner function slightly to take ordering into account. This may add some
overhead to your application since you need to insert every value in the
appropriate place, but since you're spilling anyway the overhead will
likely be shadowed by disk I/O.

Let me know if that works.
-Andrew


2015-01-20 9:18 GMT-08:00 Justin Uang <justin.uang@gmail.com>:

> Hi,
>
> I am trying to aggregate a key based on some timestamp, and I believe that
> spilling to disk is changing the order of the data fed into the combiner.
>
> I have some timeseries data that is of the form: ("key", "date", "other
> data")
>
>     Partition 1
>     ("A", 2, ...)
>     ("B", 4, ...)
>     ("A", 1, ...)
>     ("A", 3, ...)
>     ("B", 6, ...)
>
> which I then partition by key, then sort within the partition:
>
>     Partition 1
>     ("A", 1, ...)
>     ("A", 2, ...)
>     ("A", 3, ...)
>     ("A", 4, ...)
>
>     Partition 2
>     ("B", 4, ...)
>     ("B", 6, ...)
>
> If I run a combineByKey with the same partitioner, then the items for each
> key will be fed into the ExternalAppendOnlyMap in the correct order.
> However, if I spill, then the time slices are spilled to disk as multiple
> partial combiners. When its time to merge the spilled combiners for each
> key, the combiners are combined in the wrong order.
>
> For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
> [("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that
> the combiners can be combined in the wrong order, like [("A", 3, ...),
> ("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the
> invariant that all the values for A are passed in order to the combiners.
>
> I'm not an expert, but I suspect that this is because we use a heap
> ordered by key when iterating, which doesn't retain the order the spilled
> combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
> where spill_index is incremented each time we spill? This would mean that
> we would pop and merge the combiners of each key in order, resulting in
> [("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].
>
> Thanks in advance for the help! If there is a way to do this already in
> Spark 1.2, can someone point it out to me?
>
> Best,
>
> Justin
>

Mime
View raw message