spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brett Marcott <brett.marc...@gmail.com>
Subject Re: SortMergeJoinExec: Utilizing child partitioning when joining
Date Tue, 07 Jan 2020 08:00:00 GMT
1. Where can I find information on how to run standard performance
tests/benchmarks?
2. Are performance degradations to existing queries that are fixable by new
equivalent queries not allowed for a new major spark version?

On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott <brett.marcott@gmail.com>
wrote:

> Thanks for the response Andrew.
>
> *1. The approach*
> The approach I mentioned will not introduce any new skew, so it should
> only be worsen performance if the user was relying on the shuffle to fix
> skew they had before.
> The user can address this by either not introducing their own skewed
> partition in the first place, or repartitioning with less skew again before
> the join.
> Today the user cannot change partitioning without changing the join
> condition in a hacky way:    joinkey1 >= joinkey2 && joinkey1 <= joinkey2
>
> The condition I mentioned below ensures that the *same* keys on left and
> right formed their respective subsets:
>       left and right partition expressions have the same subset (with
> regards to indices) of their respective join keys
>
> I don't believe EnsureRequirements will require any changes, just what the
> Exec's are saying is required.
>
> *2. output partitionings*
> Yea I got as far as you mentioned, but I didn't at first get why for outer
> joins only one side is used.
> Now however, I think it makes sense because for outer joins you may be
> introducing nulls for at least one side, which makes that sides
> partitioning invalid right?
>
> Warn New Year Regards,
> Brett
>
> On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew <loandrew@amazon.com> wrote:
>
>> “Thoughts on this approach?“
>>
>>
>>
>> Just to warn you this is a hazardous optimization without cardinality
>> information. Removing columns from the hash exchange reduces entropy
>> potentially resulting in skew. Also keep in mind that if you reduce the
>> number of columns on one side of the join you need todo it on the other.
>> This will require you to rewrite EnsureRequirements or add a special case
>> to detect this.
>>
>>
>>
>> As a word of warning there’s a whole bunch of subtle things that
>> EnsureRequirements is doing and its really easy to unintentionally create
>> performance regressions while making improvements in other areas.
>>
>>
>>
>> “Could someone help explain why the different join types have different
>> output partitionings“
>>
>>
>>
>> Long story short when a join happens the join exec zips together the
>> partitions of the left and right side so that one partition of the join has
>> the elements of the left and right.  In the case of an inner join this
>> means that that the resulting RDD is now partitioned by both the left join
>> keys and the right join keys.  I’d suggest taking a look at the join execs
>> and take a look at how they build the result RDD from the partitions of the
>> left and right RDDs.(see doExecute(…))  left/right outer does look
>> surprising though.
>>
>>
>>
>> You should see something like…
>>
>>
>>
>> left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
>>
>>
>>
>>
>>
>> Cheers Andrew
>>
>>
>>
>> *From: *Brett Marcott <brett.marcott@gmail.com>
>> *Date: *Tuesday, December 31, 2019 at 11:49 PM
>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>> *Subject: *SortMergeJoinExec: Utilizing child partitioning when joining
>>
>>
>>
>> Hi all,
>>
>>
>>
>> I found this jira for an issue I ran into recently:
>>
>> https://issues.apache.org/jira/browse/SPARK-28771
>>
>>
>>
>> My initial idea for a fix is to change SortMergeJoinExec's (and
>> ShuffledHashJoinExec) requiredChildDistribution.
>>
>>
>>
>> At least if all below conditions are met, we could only require a subset
>> of keys for partitioning:
>>
>> left and right children's output partitionings are hashpartitioning with
>> same numpartitions
>>
>> left and right partition expressions have the same subset (with regards
>> to indices) of their respective join keys
>>
>>
>>
>> If that subset of keys is returned by requiredChildDistribution,
>> then EnsureRequirements.ensureDistributionAndOrdering would not add a
>> shuffle stage, hence reusing the children's partitioning.
>>
>>
>>
>> 1.Thoughts on this approach?
>>
>>
>>
>> 2. Could someone help explain why the different join types have different
>> output partitionings in SortMergeJoinExec.outputPartitioning
>> <https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96>
>> ?
>>
>>
>>
>> Thanks,
>>
>> Brett
>>
>>
>>
>>
>>
>

Mime
View raw message