spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petar Zecevic <petar.zece...@gmail.com>
Subject Re: Sort-merge join improvement
Date Mon, 23 Apr 2018 16:29:58 GMT
Hi,

the PR tests completed successfully 
(https://github.com/apache/spark/pull/21109).

Can you please review the patch and merge it upstream if you think it's OK?

Thanks,

Petar


Le 4/18/2018 à 4:52 PM, Petar Zecevic a écrit :
> As instructed offline, I opened a JIRA for this:
>
> https://issues.apache.org/jira/browse/SPARK-24020
>
> I will create a pull request soon.
>
>
> Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :
>> Hello everybody
>>
>> We (at University of Zagreb and University of Washington) have
>> implemented an optimization of Spark's sort-merge join (SMJ) which has
>> improved performance of our jobs considerably and we would like to know
>> if Spark community thinks it would be useful to include this in the main
>> distribution.
>>
>> The problem we are solving is the case where you have two big tables
>> partitioned by X column, but also sorted by Y column (within partitions)
>> and you need to calculate an expensive function on the joined rows.
>> During a sort-merge join, Spark will do cross-joins of all rows that
>> have the same X values and calculate the function's value on all of
>> them. If the two tables have a large number of rows per X, this can
>> result in a huge number of calculations.
>>
>> Our optimization allows you to reduce the number of matching rows per X
>> using a range condition on Y columns of the two tables. Something like:
>>
>> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>>
>> The way SMJ is currently implemented, these extra conditions have no
>> influence on the number of rows (per X) being checked because these
>> extra conditions are put in the same block with the function being
>> calculated.
>>
>> Our optimization changes the sort-merge join so that, when these extra
>> conditions are specified, a queue is used instead of the
>> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
>> moving window across the values from the right relation as the left row
>> changes. You could call this a combination of an equi-join and a theta
>> join (we call it "sort-merge inner range join").
>>
>> Potential use-cases for this are joins based on spatial or temporal
>> distance calculations.
>>
>> The optimization is triggered automatically when an equi-join expression
>> is present AND lower and upper range conditions on a secondary column
>> are specified. If the tables aren't sorted by both columns, appropriate
>> sorts will be added.
>>
>>
>> We have several questions:
>>
>> 1. Do you see any other way to optimize queries like these (eliminate
>> unnecessary calculations) without changing the sort-merge join algorithm?
>>
>> 2. We believe there is a more general pattern here and that this could
>> help in other similar situations where secondary sorting is available.
>> Would you agree?
>>
>> 3. Would you like us to open a JIRA ticket and create a pull request?
>>
>> Thanks,
>>
>> Petar Zecevic
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Mime
View raw message