spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xianjin YE (Jira)" <>
Subject [jira] [Commented] (SPARK-24193) Sort by disk when number of limit is big in TakeOrderedAndProjectExec
Date Fri, 08 May 2020 06:02:00 GMT


Xianjin YE commented on SPARK-24193:

Hi, [] [~cloud_fan] the fallback config has a correctness issue, we may
need to revert this change in Spark 2.4 and Spark 3.0


Way to reproduce:

    val spark = SparkSession
      .appName("Spark TopK test")
      .master("local-cluster[8, 1, 1024]")
      .getOrCreate()    val data = spark.range(100000, 0, -1, 10).toDF("id").selectExpr("id
+ 1 as id")
    spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100)
    val topKInSort = data.orderBy("id").limit(200).rdd.collect()
    spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue)
    val topKInMemory = data.orderBy("id").limit(200).rdd.collect()
    println("[", ",", "]"))
    println("[", ",", "]"))
    assert(topKInMemory sameElements topKInSort)


The issue:

`CollectLimitExec`'s core idea is `sortedRDD.mapPartitionsInternal(_.take(limit)).repartition(1).mapPartitionsInternal(_.take(limit))`
which doesn't guarantee the ordering semantics, so we cannot simply fallback to CollectLimitExec.


Proposal to fix:
 # revert the fallback logic
 # implements CollectLimitExec similar with CollectTailExec, however it may not suitable if
the k in TopK is large enough. 
 # Another one is to do a similar calculation of CollectTailExec, however we only collect
record number of each partition and we can take the exact number of records in each partition
by leverage that information.

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> ---------------------------------------------------------------------
>                 Key: SPARK-24193
>                 URL:
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Jin Xing
>            Assignee: Jin Xing
>            Priority: Major
>             Fix For: 2.4.0
> Physical plan of  "_select colA from t order by colB limit M_" is _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see

> Shall we add a config -- if the number of limit (M) is too big, we can sort by disk ?
Thus memory issue can be resolved.

This message was sent by Atlassian Jira

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message