spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brett Marcott <brett.marc...@gmail.com>
Subject Re: SortMergeJoinExec: Utilizing child partitioning when joining
Date Thu, 02 Jan 2020 23:05:19 GMT
Thanks for the response Andrew.

*1. The approach*
The approach I mentioned will not introduce any new skew, so it should only
be worsen performance if the user was relying on the shuffle to fix skew
they had before.
The user can address this by either not introducing their own skewed
partition in the first place, or repartitioning with less skew again before
the join.
Today the user cannot change partitioning without changing the join
condition in a hacky way:    joinkey1 >= joinkey2 && joinkey1 <= joinkey2

The condition I mentioned below ensures that the *same* keys on left and
right formed their respective subsets:
      left and right partition expressions have the same subset (with
regards to indices) of their respective join keys

I don't believe EnsureRequirements will require any changes, just what the
Exec's are saying is required.

*2. output partitionings*
Yea I got as far as you mentioned, but I didn't at first get why for outer
joins only one side is used.
Now however, I think it makes sense because for outer joins you may be
introducing nulls for at least one side, which makes that sides
partitioning invalid right?

Warn New Year Regards,
Brett

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <loandrew@amazon.com> wrote:

> “Thoughts on this approach?“
>
>
>
> Just to warn you this is a hazardous optimization without cardinality
> information. Removing columns from the hash exchange reduces entropy
> potentially resulting in skew. Also keep in mind that if you reduce the
> number of columns on one side of the join you need todo it on the other.
> This will require you to rewrite EnsureRequirements or add a special case
> to detect this.
>
>
>
> As a word of warning there’s a whole bunch of subtle things that
> EnsureRequirements is doing and its really easy to unintentionally create
> performance regressions while making improvements in other areas.
>
>
>
> “Could someone help explain why the different join types have different
> output partitionings“
>
>
>
> Long story short when a join happens the join exec zips together the
> partitions of the left and right side so that one partition of the join has
> the elements of the left and right.  In the case of an inner join this
> means that that the resulting RDD is now partitioned by both the left join
> keys and the right join keys.  I’d suggest taking a look at the join execs
> and take a look at how they build the result RDD from the partitions of the
> left and right RDDs.(see doExecute(…))  left/right outer does look
> surprising though.
>
>
>
> You should see something like…
>
>
>
> left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
>
>
>
>
>
> Cheers Andrew
>
>
>
> *From: *Brett Marcott <brett.marcott@gmail.com>
> *Date: *Tuesday, December 31, 2019 at 11:49 PM
> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
> *Subject: *SortMergeJoinExec: Utilizing child partitioning when joining
>
>
>
> Hi all,
>
>
>
> I found this jira for an issue I ran into recently:
>
> https://issues.apache.org/jira/browse/SPARK-28771
>
>
>
> My initial idea for a fix is to change SortMergeJoinExec's (and
> ShuffledHashJoinExec) requiredChildDistribution.
>
>
>
> At least if all below conditions are met, we could only require a subset
> of keys for partitioning:
>
> left and right children's output partitionings are hashpartitioning with
> same numpartitions
>
> left and right partition expressions have the same subset (with regards to
> indices) of their respective join keys
>
>
>
> If that subset of keys is returned by requiredChildDistribution,
> then EnsureRequirements.ensureDistributionAndOrdering would not add a
> shuffle stage, hence reusing the children's partitioning.
>
>
>
> 1.Thoughts on this approach?
>
>
>
> 2. Could someone help explain why the different join types have different
> output partitionings in SortMergeJoinExec.outputPartitioning
> <https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96>
> ?
>
>
>
> Thanks,
>
> Brett
>
>
>
>
>

Mime
View raw message