spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From N.Venkata Naga Ravi <nvn_r...@hotmail.com>
Subject Spark SQL Query Plan optimization
Date Fri, 01 Aug 2014 17:13:41 GMT






Hi,

I am trying to understand the query plan and number of tasks /execution time created for joined
query.    

Consider following example , creating two tables emp, sal with appropriate 100 records in
each table with key for joining them.

EmpRDDRelation.scala

case class EmpRecord(key: Int, value: String)
case class SalRecord(key: Int, salary: Int)

object EmpRDDRelation {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("local[1]").setAppName("RDDRelation")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    // Importing the SQL context gives access to all the SQL functions and implicit conversions.
    import sqlContext._
    
    var rdd= sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
 
    rdd.registerAsTable("emp")

    // Once tables have been registered, you can run SQL queries over them.
    println("Result of SELECT *:")
    sql("SELECT * FROM emp").collect().foreach(println)
    
    
    var salrdd = sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
   
    salrdd.registerAsTable("sal")
     sql("SELECT * FROM sal").collect().foreach(println)
     
    var salRRDFromSQL= sql("SELECT emp.key,value,salary from emp,sal WHERE  emp.key=30 AND
emp.key=sal.key")
    salRRDFromSQL.collect().foreach(println)
    
   
  }
}

Here are my observation :

Below is query plan for above join query which creates 150 tasks. I could see Filter is added
in the plan , but not sure whether taken in optimized way. First of all it is not clear why
150 tasks are required, because i could see similar 150 tasks when executed the above join
query without filter "emp.key=30" like "SELECT emp.key,value,salary from emp,sal WHERE  emp.key=sal.key"
and took same time for both cases. So my understanding emp.key =30 filter should take place
first and on top of the filtered records from emp table it should join with sal table( From
the Oracle RDBMS perspective) .  But here query plan joins tables first  and applies filter
later.  Is there anyway we can improve it from code wise or does require enhancement from
Spark SQL side.

Please review my observation and let me know your comments.


== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
    ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:174),
which is now runnable
14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2 (SchemaRDD[8]
at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
    ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:174)
14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks


 		 	   		  
Mime
View raw message