spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: Dataframe Partitioning
Date Wed, 02 Mar 2016 00:15:57 GMT
I don't entirely agree.  You're best off picking the right size :).  That's
almost impossible, though, since at the input end of the query processing
you often want a large number of partitions to get sufficient parallelism
for both performance and to avoid spilling or OOM, while at the output end
of the query processing (after all the pruning and filtering) you often
have only a few result rows, which means that splitting those few rows
across many partitions in order to do a sort or similar is actually pretty
silly and inefficient. I'll frequently see sorts where the per-partition
sorts have only one or two records and it would have been quicker and more
efficient to sort using a small number of partitions rather than using
RangePartitioning to split the few rows across many partitions, then doing
a degenerate/trivial form of sort on each of those partitions with their
one or two rows, and finally merging all those tiny partitions back in
order to produce the final results.

Since the optimum number of shuffle partitions is different at different
points in the query processing flow, it's really impossible to pick a
static best number of shuffle partitions.  Using spark.sql.adaptive.enabled
to turn on ExchangeCoordinator and dynamically set the number of shuffle
partitions mostly works pretty well, but it still has at least a couple of
issues.  One is that it makes things worse in the case of data skew since
it doesn't stop coalescing partitions until after the coalesced partition
size exceeds a target value; so if you've got some big ugly partitions that
exceed the target size all on their own, they'll often be even bigger and
uglier after the ExchangeCoordinator is done merging them with a few
smaller partitions.  The other issue is that adaptive partitioning doesn't
even try to do anything currently with any partitioning other than
HashPartitioning, so you've still got the sorting problem using
RangePartitioning that I just got done describing.

I've actually started working on addressing each of those problems.

On Tue, Mar 1, 2016 at 3:43 PM, Michael Armbrust <michael@databricks.com>
wrote:

> If you have to pick a number, its better to over estimate than
> underestimate since task launching in spark is relatively cheap compared to
> spilling to disk or OOMing (now much less likely due to Tungsten).
> Eventually, we plan to make this dynamic, but you should tune for your
> particular workload.
>
> On Tue, Mar 1, 2016 at 3:19 PM, Teng Liao <tliao@palantir.com> wrote:
>
>> Hi,
>>
>> I was wondering what the rationale behind defaulting all repartitioning
>> to spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running
>> a job whose input partitions is 2 and, using the default value for
>> spark.sql.shuffle.partitions, this is now 200. Thanks.
>>
>> -Teng Fei Liao
>>
>
>

Mime
View raw message