spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petar Zecevic <>
Subject Re: Sort-merge join improvement
Date Tue, 15 May 2018 08:56:41 GMT
Based on some reviews I put additional effort into fixing the case when 
wholestage codegen is turned off.

Sort-merge join with additional range conditions is now 10x faster (can 
be more or less, depending on exact use-case) in both cases - with 
wholestage turned off or on - compared to non-optimized SMJ.

Merging this would help us tremendously and I believe this can be useful 
in other applications, too.

Can you please review ( and 
merge the patch?

Thank you,

Petar Zecevic

Le 4/23/2018 à 6:28 PM, Petar Zecevic a écrit :
> Hi,
> the PR tests completed successfully
> (
> Can you please review the patch and merge it upstream if you think it's OK?
> Thanks,
> Petar
> Le 4/18/2018 à 4:52 PM, Petar Zecevic a écrit :
>> As instructed offline, I opened a JIRA for this:
>> I will create a pull request soon.
>> Le 4/17/2018 à 6:21 PM, Petar Zecevic a écrit :
>>> Hello everybody
>>> We (at University of Zagreb and University of Washington) have
>>> implemented an optimization of Spark's sort-merge join (SMJ) which has
>>> improved performance of our jobs considerably and we would like to know
>>> if Spark community thinks it would be useful to include this in the main
>>> distribution.
>>> The problem we are solving is the case where you have two big tables
>>> partitioned by X column, but also sorted by Y column (within partitions)
>>> and you need to calculate an expensive function on the joined rows.
>>> During a sort-merge join, Spark will do cross-joins of all rows that
>>> have the same X values and calculate the function's value on all of
>>> them. If the two tables have a large number of rows per X, this can
>>> result in a huge number of calculations.
>>> Our optimization allows you to reduce the number of matching rows per X
>>> using a range condition on Y columns of the two tables. Something like:
>>> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
>>> The way SMJ is currently implemented, these extra conditions have no
>>> influence on the number of rows (per X) being checked because these
>>> extra conditions are put in the same block with the function being
>>> calculated.
>>> Our optimization changes the sort-merge join so that, when these extra
>>> conditions are specified, a queue is used instead of the
>>> ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a
>>> moving window across the values from the right relation as the left row
>>> changes. You could call this a combination of an equi-join and a theta
>>> join (we call it "sort-merge inner range join").
>>> Potential use-cases for this are joins based on spatial or temporal
>>> distance calculations.
>>> The optimization is triggered automatically when an equi-join expression
>>> is present AND lower and upper range conditions on a secondary column
>>> are specified. If the tables aren't sorted by both columns, appropriate
>>> sorts will be added.
>>> We have several questions:
>>> 1. Do you see any other way to optimize queries like these (eliminate
>>> unnecessary calculations) without changing the sort-merge join algorithm?
>>> 2. We believe there is a more general pattern here and that this could
>>> help in other similar situations where secondary sorting is available.
>>> Would you agree?
>>> 3. Would you like us to open a JIRA ticket and create a pull request?
>>> Thanks,
>>> Petar Zecevic
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail:
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail:
> ---------------------------------------------------------------------
> To unsubscribe e-mail:

To unsubscribe e-mail:

View raw message