spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: SparkSQL - Limit pushdown on BroadcastHashJoin
Date Mon, 18 Apr 2016 18:09:41 GMT
I could be wrong but I think we currently do that through whole stage
codegen. After processing every row on the stream side, the generated code
for broadcast join checks whether it has hit the limit or not (through this
thing called shouldStop).

It is not the most optimal solution, because a single stream side row might
output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.andrew@gmail.com> wrote:

> While you can't automatically push the limit *through* the join, we could
> push it *into* the join (stop processing after generating 10 records). I
> believe that is what Rajesh is suggesting.
>
> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
> hvanhovell@questtec.nl> wrote:
>
>> I am not sure if you can push a limit through a join. This becomes
>> problematic if not all keys are present on both sides; in such a case a
>> limit can produce fewer rows than the set limit.
>>
>> This might be a rare case in which whole stage codegen is slower, due to
>> the fact that we need to buffer the result of such a stage. You could try
>> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>
>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamohan@gmail.com>:
>>
>>> Hi,
>>>
>>> I ran the following query in spark (latest master codebase) and it took
>>> a lot of time to complete even though it was a broadcast hash join.
>>>
>>> It appears that limit computation is done only after computing complete
>>> join condition.  Shouldn't the limit condition be pushed to
>>> BroadcastHashJoin (wherein it would have to stop processing after
>>> generating 10 rows?).  Please let me know if my understanding on this is
>>> wrong.
>>>
>>>
>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>> limit 10;
>>>
>>> >>>>
>>> | == Physical Plan ==
>>> CollectLimit 10
>>> +- WholeStageCodegen
>>>    :  +- Project [l_partkey#893]
>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>>> BuildRight, None
>>>    :        :- Project [l_partkey#893]
>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>    :        +- INPUT
>>>    +- BroadcastExchange
>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>> bigint)),List(ps_partkey#908))
>>>       +- WholeStageCodegen
>>>          :  +- Project [ps_partkey#908]
>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>  |
>>> >>>>
>>>
>>>
>>>
>>>
>>> --
>>> ~Rajesh.B
>>>
>>
>>
>

Mime
View raw message