spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Carrino <john.carr...@gmail.com>
Subject Possible space improvements to shuffle
Date Tue, 02 Jun 2015 20:50:47 GMT
One thing I have noticed with ExternalSorter is that if an ordering is not
defined, it does the sort using only the partition_id, instead of
(parition_id, hash).  This means that on the reduce side you need to pull
the entire dataset into memory before you can begin iterating over the
results.

I figure since we are doing a sort of the data anyway it doesn't seem more
expensive to sort by (parition, hash).  That way the reducer can do a merge
and only has the hold in memory the data for a single int hashCode before
it can combine then and start returning results form the iterator.

Has this already been discussed?  If so, can someone point me in the right
direction to find out more?

Thanks for any help!
-jc

p.s. I am using spark version 1.3.1.  The code I am looking at below is
from ExternalSorter#partitionedIterator.  I think maybe
!ordering.isDefined should also include "&& !aggregator.isDefined"

   if (spills.isEmpty && partitionWriters == null) {
      // Special case: if we have only in-memory data, we don't need to
merge streams, and perhaps
      // we don't even need to sort by anything other than partition ID
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition
ID, not key

groupByPartition(collection.destructiveSortedIterator(partitionComparator))
      } else {
        // We do need to sort by both partition ID and key

groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
      }

Mime
View raw message