spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Armbrust <mich...@databricks.com>
Subject Re: Spark SQL Query Plan optimization
Date Sun, 03 Aug 2014 01:49:02 GMT
The number of partitions (which decides the number of tasks) is fixed after
any shuffle and can be configured using 'spark.sql.shuffle.partitions'
though SQLConf (i.e. sqlContext.set(...) or
"SET spark.sql.shuffle.partitions=..." in sql)  It is possible we will auto
select this based on statistics in the future.

I think you might be reading the query plan backwards.  The data starts at
the bottom and moves upwards.  The filter is being performed before the
shuffle (exchange) and join operations.


On Fri, Aug 1, 2014 at 10:13 AM, N.Venkata Naga Ravi <nvn_ravi@hotmail.com>
wrote:

>  Hi,
>
> I am trying to understand the query plan and number of tasks /execution
> time created for joined query.
>
> Consider following example , creating two tables emp, sal with appropriate
> 100 records in each table with key for joining them.
>
> *EmpRDDRelation.scala*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *case class EmpRecord(key: Int, value: String)case class SalRecord(key:
> Int, salary: Int)object EmpRDDRelation {  def main(args: Array[String])
> {    val sparkConf = new
> SparkConf().setMaster("local[1]").setAppName("RDDRelation")     val sc =
> new SparkContext(sparkConf)    val sqlContext = new SQLContext(sc)    //
> Importing the SQL context gives access to all the SQL functions and
> implicit conversions.    import sqlContext._        var rdd=
> sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
> rdd.registerAsTable("emp")    // Once tables have been registered, you can
> run SQL queries over them.     println("Result of SELECT *:")
> sql("SELECT * FROM emp").collect().foreach(println)            var salrdd =
> sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
> salrdd.registerAsTable("sal")      sql("SELECT * FROM
> sal").collect().foreach(println)         var salRRDFromSQL= sql("SELECT
> emp.key,value,salary from emp,sal WHERE  emp.key=30 AND emp.key=sal.key")
>     salRRDFromSQL.collect().foreach(println)         }}*
>
> Here are my observation :
>
> Below is query plan for above join query which creates 150 tasks. I could
> see Filter is added in the plan , but not sure whether taken in optimized
> way. First of all it is not clear why 150 tasks are required, because i
> could see similar 150 tasks when executed the above join query without
> filter "*emp.key=30" *like "*SELECT emp.key,value,salary from emp,sal
> WHERE  emp.key=sal.key"* and took *same time for both cases*. So my
> understanding emp.key =30 filter should take place first and on top of the
> filtered records from emp table it should join with sal table( From the
> Oracle RDBMS perspective) .  But here query plan joins tables first  and
> applies filter later.  Is there anyway we can improve it from code wise or
> does require enhancement from Spark SQL side.
>
> Please review my observation and let me know your comments.
>
>
> == Query Plan ==
> Project [key#0:0,value#1:1,salary#3:3]
>  HashJoin [key#0], [key#2], BuildRight
>
>
> *Exchange (HashPartitioning [key#0:0], 150)    Filter (key#0:0 = 30)*
> ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
> basicOperators.scala:174
>   Exchange (HashPartitioning [key#2:0], 150)
>    ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
> basicOperators.scala:174), which is now runnable
> 14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from
> Stage 2 (SchemaRDD[8] at RDD at SchemaRDD.scala:98
> == Query Plan ==
> Project [key#0:0,value#1:1,salary#3:3]
>  HashJoin [key#0], [key#2], BuildRight
>   Exchange (HashPartitioning [key#0:0], 150)
>    Filter (key#0:0 = 30)
>     ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
> basicOperators.scala:174
>   Exchange (HashPartitioning [key#2:0], 150)
>    ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
> basicOperators.scala:174)
> 14/08/01 22:20:02 INFO TaskSchedulerImpl: *Adding task set 2.0 with 150
> tasks*
>

Mime
View raw message