If  you are using Spark > 1.5, the best way is to use DataFrame API directly, instead of SQL. In dataframe, you can specify the boardcast join hint in the dataframe API, which will force the boardcast join.


From: mich.talebzadeh@gmail.com
Date: Mon, 20 Jun 2016 13:09:17 +0100
Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?
To: linguin.m.s@gmail.com
CC: zhengyt@dtdream.com; user@spark.apache.org

what sort of the tables are these?

Can you register the result set as temp table and do a join on that assuming the RS is going to be small

s.filter(($"c2" < 1000)).registerTempTable("tmp")

and then do a join between tmp and Table2


On 20 June 2016 at 12:38, Takeshi Yamamuro <linguin.m.s@gmail.com> wrote:
Seems it is hard to predict the output size of filters because the current spark has limited statistics of input data. A few hours ago, Reynold created a ticket for cost-based optimizer framework in https://issues.apache.org/jira/browse/SPARK-16026.
If you have ideas, questions, and suggestions, feel free to join the discussion.

// maropu

On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247 <zhengyt@dtdream.com> wrote:

Thanks for your reply, In fact, that is what i just did....

But my question is:
Can we change the spark join behavior more clever, to turn a sortmergejoin into broadcasthashjoin automatically when if "found" that a output RDD is small enough?

发件人:Takeshi Yamamuro <linguin.m.s@gmail.com>
发送时间:2016年6月20日(星期一) 19:16
收件人:梅西0247 <zhengyt@dtdream.com>
抄 送:user <user@spark.apache.org>
主 题:Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin?


How about caching the result of `select * from a where a.c2 < 1000`, then joining them?
You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to enable broadcast joins for the result table.

// maropu

On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhengyt@dtdream.com> wrote:
Hi everyone,

I ran a SQL join statement on Spark 1.6.1 like this:
select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;
and it took quite a long time because It is a SortMergeJoin and the two tables are big.

In fact,  the size of filter result(select * from a where a.c2 < 1000) is very small, and I think a better solution is to use a BroadcastJoin with the filter result, but  I know  the physical plan is static and it won't be changed.

So, can we make the physical plan more adaptive? (In this example, I mean using a  BroadcastHashJoin instead of SortMergeJoin automatically. )

Takeshi Yamamuro

Takeshi Yamamuro