spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Madhu Siddalingaiah (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-983) Support external sorting for RDD#sortByKey()
Date Fri, 23 May 2014 00:52:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14006664#comment-14006664
] 

Madhu Siddalingaiah commented on SPARK-983:
-------------------------------------------

Looking at [OrderedRDDFunctions|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala],
there's a shuffle step using RangePartitioner, then an in-memory sort of each partition by
key. If we separate the partition sort and make that available as an independent API call,
it could serve two purposes: sortByKey() and sortPartitions(). Then we could improve sortPartitions()
to fall back to disk like [ExternalAppendOnlyMap|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala].

The above approach would address this JIRA feature and support the equivalent of Hadoop secondary
sort in a scalable way. There are plenty of time series-like use cases that could benefit
from it. There's a lot more to it, but I'll code something up locally and see how it goes...

> Support external sorting for RDD#sortByKey()
> --------------------------------------------
>
>                 Key: SPARK-983
>                 URL: https://issues.apache.org/jira/browse/SPARK-983
>             Project: Spark
>          Issue Type: New Feature
>    Affects Versions: 0.9.0
>            Reporter: Reynold Xin
>
> Currently, RDD#sortByKey() is implemented by a mapPartitions which creates a buffer to
hold the entire partition, then sorts it. This will cause an OOM if an entire partition cannot
fit in memory, which is especially problematic for skewed data. Rather than OOMing, the behavior
should be similar to the [ExternalAppendOnlyMap|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala],
where we fallback to disk if we detect memory pressure.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message