spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jurriaan Pruis (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-15415) Marking partitions for broadcast broken
Date Fri, 20 May 2016 06:42:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-15415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15292828#comment-15292828
] 

Jurriaan Pruis commented on SPARK-15415:
----------------------------------------

[~rxin] I could try to work on that. The reason I ran into this issue was that Spark was broadcasting
a 8M rows table which resulted in my driver going OOM. Reducing the autoBroadcastJoinThreshold
didn't help unless I would reduce it to very small values which would result in nothing being
broadcasted at all (which is a bug too I guess, maybe it's is calculating the sizeInBytes
incorrectly?). So I tried to fix the problem by reducing it to zero and only marking the tables
I needed to be broadcasted, but that didn't work. I guess what I really needed was a 'do not
broadcast' hint. 

> Marking partitions for broadcast broken
> ---------------------------------------
>
>                 Key: SPARK-15415
>                 URL: https://issues.apache.org/jira/browse/SPARK-15415
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Jurriaan Pruis
>
> I couldn't get the broadcast(DataFrame) sql function to work in Spark 2.0.
> It does work in Spark 1.6.1:
> {code}
> $ pyspark --conf spark.sql.autoBroadcastJoinThreshold=0
> >>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2),
'id').explain()
> == Physical Plan ==
> Project [id#0L]
> +- BroadcastHashJoin [id#0L], [id#1L], BuildRight
>    :- ConvertToUnsafe
>    :  +- Scan ExistingRDD[id#0L]
>    +- ConvertToUnsafe
>       +- Scan ExistingRDD[id#1L]
> {code}
> While in Spark 2.0 this results in:
> {code}
> >>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2),
'id').explain()
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#6L]
> :     +- SortMergeJoin [id#6L], [id#9L], Inner, None
> :        :- INPUT
> :        +- INPUT
> :- WholeStageCodegen
> :  :  +- Sort [id#6L ASC], false, 0
> :  :     +- INPUT
> :  +- Exchange hashpartitioning(id#6L, 200), None
> :     +- WholeStageCodegen
> :        :  +- Range 0, 1, 8, 1000, [id#6L]
> +- WholeStageCodegen
>    :  +- Sort [id#9L ASC], false, 0
>    :     +- INPUT
>    +- ReusedExchange [id#9L], Exchange hashpartitioning(id#6L, 200), None
> {code}
>  
> While it should look like (output when you remove the spark.sql.autoBroadcastJoinThreshold
conf):
> {code}
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#0L]
> :     +- BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight, None
> :        :- Range 0, 1, 8, 1000, [id#0L]
> :        +- INPUT
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
>    +- WholeStageCodegen
>       :  +- Range 0, 1, 8, 1000, [id#3L]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message