spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Long, Andrew" <loand...@amazon.com.INVALID>
Subject Re: SortMergeJoinExec: Utilizing child partitioning when joining
Date Thu, 02 Jan 2020 22:28:27 GMT
“Thoughts on this approach?“

Just to warn you this is a hazardous optimization without cardinality information. Removing
columns from the hash exchange reduces entropy potentially resulting in skew. Also keep in
mind that if you reduce the number of columns on one side of the join you need todo it on
the other. This will require you to rewrite EnsureRequirements or add a special case to detect
this.

As a word of warning there’s a whole bunch of subtle things that EnsureRequirements is doing
and its really easy to unintentionally create performance regressions while making improvements
in other areas.

“Could someone help explain why the different join types have different output partitionings“

Long story short when a join happens the join exec zips together the partitions of the left
and right side so that one partition of the join has the elements of the left and right. 
In the case of an inner join this means that that the resulting RDD is now partitioned by
both the left join keys and the right join keys.  I’d suggest taking a look at the join
execs and take a look at how they build the result RDD from the partitions of the left and
right RDDs.(see doExecute(…))  left/right outer does look surprising though.

You should see something like…

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>


Cheers Andrew

From: Brett Marcott <brett.marcott@gmail.com>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "dev@spark.apache.org" <dev@spark.apache.org>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

Hi all,

I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771

My initial idea for a fix is to change SortMergeJoinExec's (and ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of keys for partitioning:
left and right children's output partitionings are hashpartitioning with same numpartitions
left and right partition expressions have the same subset (with regards to indices) of their
respective join keys

If that subset of keys is returned by requiredChildDistribution, then EnsureRequirements.ensureDistributionAndOrdering
would not add a shuffle stage, hence reusing the children's partitioning.

1.Thoughts on this approach?

2. Could someone help explain why the different join types have different output partitionings
in SortMergeJoinExec.outputPartitioning<https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96>?

Thanks,
Brett


Mime
View raw message