spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnpranavrao <>
Subject [Spark SQL Discuss] Better support for Partitioning and Bucketing when used together
Date Thu, 31 May 2018 17:04:19 GMT
We use partitioned + bucketed datasets for use-cases where we can afford to
take a perf hit at write time, so that reads are optimised. But I feel Spark
could more optimally exploit the data layout in query planning. Here I
describe why this is a problem, and how it could be improved.
<span aria-hidden="true" class="octicon octicon-link">
<#why-is-partitioning--bucketing-required-together>  Why is Partitioning +
Bucketing required (together)?
There are a class of common problems that can't be solved by:
Pure partitioning - We want to avoid shuffle on some commonly joined
datasets, on the same few join keys. This can't be solved by pure
Pure Bucketing - For most DataSources (on Spark and other processing
frameworks), the folder is the least granular level of identifying datasets.
The HiveMetastore lets us collect arbitrary folder partitions into a logical
view, and this helps in incremental ingestion and lends itself to a simple
form of MVCC.
On Spark, when you try to both Partition and Bucket a dataset, the format on
disk and in the metastore is correctly recorded. But this information isn't
optimally used for query planning because:
A partitioned+bucketed dataset is read into num_buckets input RDD partitions
due to createBucketedRDD. For large datasets with a lot of partitions, this
DataFrame is now unusable because of the severely limited parallelism. We
can't have large num_buckets, as it would lead to small file problems,
especially in skewed partitions.
We could manually turn bucketing off with the
spark.sql.sources.bucketing.enabled flag, but we would be losing the natural
distribution that's present in the dataset, and lose out on shuffle
<span aria-hidden="true" class="octicon octicon-link">
<#what-could-happen-ideally>  What could happen ideally:
Partitioned + Bucketed data actually have a well defined distribution. It
does not fit the currently defined HashClusteredDistribution, but a new one
can be defined which takes into account both the value based distribution of
partition values and the hash based distribution of bucketing columns.
Queries involving partition columns AND bucketing columns should make use of
this data distribution.E.g: Suppose you have PartitioningCols(a,b) and
BucketingCols(c), the joins we can support without shuffle would be on keys
(a,b,c), (a,c)[Coalesce values of b into a] and (a)[Partition-Partition
Number of input RDD partitions should be decided at the last possible stage
of query planning. If no join (as described above) can utilize bucketed
data, the physicalPlan could fallback to regular DataSource scan.
<span aria-hidden="true" class="octicon octicon-link"> <#implementation> 
This will involve changes some aspects of query planning. Here I list the
top level changes:
Add a new Distribution and Partitioning to describe this partition value and
bucket column hash data layout.
Change the Logical and SparkPlan(DataSourceScanExec) to capture the above
Distribution and Partitioning.
Just like ensureRequirements adds a shuffle to satisfy child distributions,
we could have it add a coalesce operator to club together buckets across
folder partitions if required, and at different partition hierarchies
according to the distribution required. For example: With
PartitioningCols(a,b) and BucketingCols(c), a join that involves (a,c) can
be answered by coalescing the b values within (a,c)'s partitions. This would
be a meta-data only operation.
Account for co-clustered partitions so that RDDs can be zipped in joins -
this will have to handle partitions pruned out too.
There is a strong requirement for this functionality at my team (in Amazon).
I've opened a  JIRA <>  
regarding this issue here.I did consider DataSourcesV2, but it looks like a
better fit here.
I wanted some inputs regarding this. Is this approach feasible and is it
aligned with how Spark wants to handle native datasources in the future? 
Does anyone else have similar requirements?

Sent from:
View raw message