spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuming Wang (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-29655) Prefer bucket join if adaptive execution is enabled and maxNumPostShufflePartitions != bucket number
Date Wed, 30 Oct 2019 10:13:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-29655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962893#comment-16962893
] 

Yuming Wang commented on SPARK-29655:
-------------------------------------

cc [~Jk_Self]

> Prefer bucket join if adaptive execution is enabled and maxNumPostShufflePartitions !=
bucket number
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-29655
>                 URL: https://issues.apache.org/jira/browse/SPARK-29655
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> Prefer bucketing join if adaptive execution is enabled and maxNumPostShufflePartitions
!= bucket number.  How to reproduce:
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> spark.conf.set("spark.sql.shuffle.partitions", 4)
> val bucketedTableName = "bucketed_table"
> spark.range(10).write.bucketBy(4, "id").sortBy("id").mode(SaveMode.Overwrite).saveAsTable(bucketedTableName)
> val bucketedTable = spark.table(bucketedTableName)
> val df = spark.range(4)
> df.join(bucketedTable, "id").explain()
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 5)
> df.join(bucketedTable, "id").explain()
> {code}
> Output:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan(isFinalPlan=false)
> +- Project [id#5L]
>    +- SortMergeJoin [id#5L], [id#3L], Inner
>       :- Sort [id#5L ASC NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(id#5L, 5), true, [id=#92]
>       :     +- Range (0, 4, step=1, splits=16)
>       +- Sort [id#3L ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(id#3L, 5), true, [id=#93]
>             +- Project [id#3L]
>                +- Filter isnotnull(id#3L)
>                   +- FileScan parquet default.bucketed_table[id#3L] Batched: true, DataFilters:
[isnotnull(id#3L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/spark-3.0.0-preview-bin-hadoop3.2/spark-warehouse/bucketed_table],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>,
SelectedBucketsCount: 4 out of 4
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message