spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petar Zecevic <petar.zece...@gmail.com>
Subject Sort-merge join improvement
Date Wed, 18 Apr 2018 01:22:10 GMT
Hello everybody

We (at University of Zagreb and University of Washington) have 
implemented an optimization of Spark's sort-merge join (SMJ) which has 
improved performance of our jobs considerably and we would like to know 
if Spark community thinks it would be useful to include this in the main 
distribution.

The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted by Y column (within partitions) 
and you need to calculate an expensive function on the joined rows. 
During a sort-merge join, Spark will do cross-joins of all rows that 
have the same X values and calculate the function's value on all of 
them. If the two tables have a large number of rows per X, this can 
result in a huge number of calculations.

Our optimization allows you to reduce the number of matching rows per X 
using a range condition on Y columns of the two tables. Something like:

... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d

The way SMJ is currently implemented, these extra conditions have no 
influence on the number of rows (per X) being checked because these 
extra conditions are put in the same block with the function being 
calculated.

Our optimization changes the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a 
moving window across the values from the right relation as the left row 
changes. You could call this a combination of an equi-join and a theta 
join (we call it "sort-merge inner range join").

Potential use-cases for this are joins based on spatial or temporal 
distance calculations.

The optimization is triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column 
are specified. If the tables aren't sorted by both columns, appropriate 
sorts will be added.


We have several questions:

1. Do you see any other way to optimize queries like these (eliminate 
unnecessary calculations) without changing the sort-merge join algorithm?

2. We believe there is a more general pattern here and that this could 
help in other similar situations where secondary sorting is available. 
Would you agree?

3. Would you like us to open a JIRA ticket and create a pull request?

Thanks,

Petar Zecevic



---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Mime
View raw message