spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?
Date Fri, 09 Jan 2015 14:59:59 GMT
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are 
inlined below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
> Hi,
>
> I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala 
> via rdd.mapPartitions(…). Using the latest release 1.2.0.
>
> Simple example; load up some sample data from parquet on HDFS (about 
> 380m rows, 10 columns) on a 7 node cluster.
>
>   val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
> t.registerTempTable("test1”)
> sqlC.cacheTable("test1”)
>
> Now lets do some operations on it; I want the total sales & quantities 
> sold for each hour in the day so I choose 3 out of the 10 possible 
> columns...
>
> sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by 
> Hour").collect().foreach(println)
>
> After the table has been 100% cached in memory, this takes around 11 
> seconds.
>
> Lets do the same thing but via a MapPartitions call (this isn’t 
> production ready code but gets the job done).
>
>   val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
> rddPC.mapPartitions { case hrs =>
>     val qtySum = new Array[Double](24)
>     val salesSum = new Array[Double](24)
>
>     for(r <- hrs) {
>       val hr = r.getInt(0)
> qtySum(hr) += r.getDouble(1)
> salesSum(hr) += r.getDouble(2)
>     }
>     (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
> }.reduceByKey((a,b) => (a._1 + b._1, a._2 + 
> b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the 
for-loop. According to my early benchmark done with Scala 2.9, for-loop 
can be orders of magnitude slower than a simple while-loop, especially 
when the body of the loop only does something as trivial as this case. 
The reason is that Scala for-loop is translated into corresponding 
foreach/map/flatMap/withFilter function calls. And that's exactly why 
Spark SQL tries to avoid for-loop or any other functional style code in 
critical paths (where every row is touched), we also uses reusable 
mutable row objects instead of the immutable version to improve 
performance. You may check HiveTableScan, ParquetTableScan, 
InMemoryColumnarTableScan etc. for reference. Also, the `sum` function 
calls in your SQL code are translated into `o.a.s.s.execution.Aggregate` 
operators, which also use imperative while-loop and reusable mutable rows.

Another thing to notice is that the `hrs` iterator physically points to 
underlying in-memory columnar byte buffers, and the `for (r <- hrs) { 
... }` loop actually decompresses and extracts values from required byte 
buffers (this is the "unwrapping" processes you mentioned below).
>
> Now this takes around ~49 seconds… Even though test1 table is 100% 
> cached. The number of partitions remains the same…
>
> Now if I create a simple RDD of a case class HourSum(hour: Int, qty: 
> Double, sales: Double)
>
> Convert the SchemaRDD;
> val rdd = sqlC.sql("select * from test1").map{ r => 
> HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()
> //cache all the data
> rdd.count()
>
> Then run basically the same MapPartitions query;
>
> rdd.mapPartitions { case hrs =>
>   val qtySum = new Array[Double](24)
>   val salesSum = new Array[Double](24)
>
>   for(r <- hrs) {
>     val hr = r.hour
> qtySum(hr) += r.qty
> salesSum(hr) += r.sales
>   }
>   (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
> }.reduceByKey((a,b) => (a._1 + b._1, a._2 + 
> b._2)).collect().foreach(println)
>
> This takes around 1.5 seconds! Albeit the memory footprint is much larger.
I guess this 1.5 seconds doesn't include the time spent on caching the 
simple RDD? As I've explained above, in the first `mapPartitions` style 
snippet, columnar byte buffer unwrapping happens within the 
`mapPartitions` call. However, in this version, the unwrapping process 
happens when the `rdd.count()` action is performed. At that point, all 
values of all columns are extracted from underlying byte buffers, and 
the portion of data you need are then manually selected and transformed 
into the simple case class RDD via the `map` call.

If you include time spent on caching the simple case class RDD, it 
should be even slower than the first `mapPartitions` version.
>
> My thinking is that because SparkSQL does store things in a columnar 
> format, there is some unwrapping to be done out of the column array 
> buffers which takes time and for some reason this just takes longer 
> when I switch out to map partitions (maybe its unwrapping the entire 
> row, even though I’m using just a subset of columns, or maybe there is 
> some object creation/autoboxing going on when calling getInt or 
> getDouble)…
>
> I’ve tried simpler cases too, like just summing sales. Running sum via 
> SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD 
> is even faster (2.6 seconds). But MapPartitions on the SchemaRDD;
>
> /sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => 
> Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum/
>
>  takes a long time (33 seconds). In all these examples everything is 
> fully cached in memory. And yes for these kinds of operations I can 
> use SQL, but for more complex queries I’d much rather be using a combo 
> of SparkSQL to select the data (so I get nice things like Parquet 
> pushdowns etc.) & functional Scala!
Again, unfortunately, functional style code like `Iterator.sum` and 
`Iterator.foldLeft` can be really slow on critical paths.
>
> I think I’m doing something dumb… Is there something I should be doing 
> to get faster performance on MapPartitions on SchemaRDDs? Is there 
> some unwrapping going on in the background that catalyst does in a 
> smart way that I’m missing?
It makes sense that people use both Spark SQL and Spark core, especially 
when Spark SQL lacks features users need (like window function, for 
now). The suggestion here is, if you really care about performance (more 
than code readability and maintenance cost), then avoid immutable, 
functional code whenever possible on any critical paths...
>
> Cheers,
> ~N
>
> Nathan McCarthy
> QUANTIUM
> Level 25, 8 Chifley, 8-12 Chifley Square
> Sydney NSW 2000
>
> T: +61 2 8224 8922
> F: +61 2 9292 6444
>
> W: quantium.com.au <www.quantium.com.au>
>
> ------------------------------------------------------------------------
>
> linkedin.com/company/quantium <www.linkedin.com/company/quantium>
>
> facebook.com/QuantiumAustralia <www.facebook.com/QuantiumAustralia>
>
> twitter.com/QuantiumAU <www.twitter.com/QuantiumAU>
>
> The contents of this email, including attachments, may be confidential 
> information. If you are not the intended recipient, any use, 
> disclosure or copying of the information is unauthorised. If you have
> received this email in error, we would be grateful if you would notify 
> us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 
> 9292 6444) and delete the message from your system.


Mime
View raw message