spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan McCarthy <Nathan.McCar...@quantium.com.au>
Subject Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?
Date Fri, 09 Jan 2015 07:37:46 GMT
Any ideas? :)

From: Nathan <nathan.mccarthy@quantium.com.au<mailto:nathan.mccarthy@quantium.com.au>>
Date: Wednesday, 7 January 2015 2:53 pm
To: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…).
Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns)
on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for each hour
in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production ready code but
gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
    val qtySum = new Array[Double](24)
    val salesSum = new Array[Double](24)

    for(r <- hrs) {
      val hr = r.getInt(0)
      qtySum(hr) += r.getDouble(1)
      salesSum(hr) += r.getDouble(2)
    }
    (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The number of
partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), r.getDouble(7),
r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
    val hr = r.hour
    qtySum(hr) += r.qty
    salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.

My thinking is that because SparkSQL does store things in a columnar format, there is some
unwrapping to be done out of the column array buffers which takes time and for some reason
this just takes longer when I switch out to map partitions (maybe its unwrapping the entire
row, even though I’m using just a subset of columns, or maybe there is some object creation/autoboxing
going on when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is fast (4.7
seconds), running a mapPartition sum on a double RDD is even faster (2.6 seconds). But MapPartitions
on the SchemaRDD;

sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => Iterator(iter.foldLeft(0.0)((t,r)
=> t+r.getDouble(0)))).sum

 takes a long time (33 seconds). In all these examples everything is fully cached in memory.
And yes for these kinds of operations I can use SQL, but for more complex queries I’d much
rather be using a combo of SparkSQL to select the data (so I get nice things like Parquet
pushdowns etc.) & functional Scala!

I think I’m doing something dumb… Is there something I should be doing to get faster performance
on MapPartitions on SchemaRDDs? Is there some unwrapping going on in the background that catalyst
does in a smart way that I’m missing?

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.au<www.quantium.com.au>

________________________________

linkedin.com/company/quantium<www.linkedin.com/company/quantium>

facebook.com/QuantiumAustralia<www.facebook.com/QuantiumAustralia>

twitter.com/QuantiumAU<www.twitter.com/QuantiumAU>


The contents of this email, including attachments, may be confidential information. If you
are not the intended recipient, any use, disclosure or copying of the information is unauthorised.
If you have received this email in error, we would be grateful if you would notify us immediately
by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message
from your system.

Mime
View raw message