spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Takeshi Yamamuro <linguin....@gmail.com>
Subject Re: broadcast fails on join
Date Tue, 30 Aug 2016 13:56:28 GMT
Hi,

How about making the value of `spark.sql.broadcastTimeout` bigger?
The value is 300 by default.

// maropu


On Tue, Aug 30, 2016 at 9:09 PM, AssafMendelson <assaf.mendelson@rsa.com>
wrote:

> Hi,
>
> I am seeing a broadcast failure when doing a join as follows:
>
> Assume I have a dataframe df with ~80 million records
>
> I do:
>
> df2 = df.filter(cond)     # reduces to ~50 million records
>
> grouped = broadcast(df.groupby(df2.colA).count())
>
> total = df2.join(grouped, df2.colA == grouped.colA, “inner”)
>
> total.filter(total[“count”] > 10).show()
>
>
>
> This fails with an exception:
>
> org.apache.spark.SparkException: Exception thrown in awaitResult:
>
>                 at org.apache.spark.util.ThreadUtils$.awaitResult(
> ThreadUtils.scala:194)
>
>                 at org.apache.spark.sql.execution.exchange.
> BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>
>                 at org.apache.spark.sql.execution.InputAdapter.
> doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>
>                 at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeBroadcast$1.apply(SparkPlan.scala:125)
>
>                 at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeBroadcast$1.apply(SparkPlan.scala:125)
>
>                 at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>
>                 at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>
>                 at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>
>                 at org.apache.spark.sql.execution.SparkPlan.
> executeBroadcast(SparkPlan.scala:124)
>
>                 at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>
>                 at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
>
>                 at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
>
>                 at org.apache.spark.sql.execution.CodegenSupport$
> class.consume(WholeStageCodegenExec.scala:153)
>
>                 at org.apache.spark.sql.execution.ProjectExec.consume(
> basicPhysicalOperators.scala:30)
>
>                 at org.apache.spark.sql.execution.ProjectExec.doConsume(
> basicPhysicalOperators.scala:62)
>
>                 at org.apache.spark.sql.execution.CodegenSupport$
> class.consume(WholeStag eCodegenExec.scala:153)
>
>                 at org.apache.spark.sql.execution.FilterExec.consume(
> basicPhysicalOperators.scala:79)
>
>
>
> However, if I do:
>
> grouped.cache()
>
> grouped.count()
>
>
>
> before the join everything is fine (btw the grouped dataframe is 1.5MB
> when cached in memory and I have more than 4GB per executor with 8
> executors, the full dataframe is ~8GB)
>
>
>
> Thanks,
>
>                 Assaf.
>
>
>
> ------------------------------
> View this message in context: broadcast fails on join
> <http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-fails-on-join-tp27623.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
---
Takeshi Yamamuro

Mime
View raw message