spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yash datta <>
Subject Re: Dataframe Partitioning
Date Wed, 02 Mar 2016 04:30:46 GMT
This is  one of the most common problems we encounter in our flow. Mark, I
am happy to help if you would like to share some of the workload.


On Wednesday 2 March 2016, Mark Hamstra <> wrote:

> I don't entirely agree.  You're best off picking the right size :).
> That's almost impossible, though, since at the input end of the query
> processing you often want a large number of partitions to get sufficient
> parallelism for both performance and to avoid spilling or OOM, while at the
> output end of the query processing (after all the pruning and filtering)
> you often have only a few result rows, which means that splitting those few
> rows across many partitions in order to do a sort or similar is actually
> pretty silly and inefficient. I'll frequently see sorts where the
> per-partition sorts have only one or two records and it would have been
> quicker and more efficient to sort using a small number of partitions
> rather than using RangePartitioning to split the few rows across many
> partitions, then doing a degenerate/trivial form of sort on each of those
> partitions with their one or two rows, and finally merging all those tiny
> partitions back in order to produce the final results.
> Since the optimum number of shuffle partitions is different at different
> points in the query processing flow, it's really impossible to pick a
> static best number of shuffle partitions.  Using spark.sql.adaptive.enabled
> to turn on ExchangeCoordinator and dynamically set the number of shuffle
> partitions mostly works pretty well, but it still has at least a couple of
> issues.  One is that it makes things worse in the case of data skew since
> it doesn't stop coalescing partitions until after the coalesced partition
> size exceeds a target value; so if you've got some big ugly partitions that
> exceed the target size all on their own, they'll often be even bigger and
> uglier after the ExchangeCoordinator is done merging them with a few
> smaller partitions.  The other issue is that adaptive partitioning doesn't
> even try to do anything currently with any partitioning other than
> HashPartitioning, so you've still got the sorting problem using
> RangePartitioning that I just got done describing.
> I've actually started working on addressing each of those problems.
> On Tue, Mar 1, 2016 at 3:43 PM, Michael Armbrust <
> <javascript:_e(%7B%7D,'cvml','');>> wrote:
>> If you have to pick a number, its better to over estimate than
>> underestimate since task launching in spark is relatively cheap compared to
>> spilling to disk or OOMing (now much less likely due to Tungsten).
>> Eventually, we plan to make this dynamic, but you should tune for your
>> particular workload.
>> On Tue, Mar 1, 2016 at 3:19 PM, Teng Liao <
>> <javascript:_e(%7B%7D,'cvml','');>> wrote:
>>> Hi,
>>> I was wondering what the rationale behind defaulting all repartitioning
>>> to spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running
>>> a job whose input partitions is 2 and, using the default value for
>>> spark.sql.shuffle.partitions, this is now 200. Thanks.
>>> -Teng Fei Liao

When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.

View raw message