spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?
Date Tue, 20 Jan 2015 20:59:20 GMT
On 1/15/15 11:26 PM, Nathan McCarthy wrote:

> Thanks Cheng!
>
> Is there any API I can get access too (e.g. ParquetTableScan) which 
> would allow me to load up the low level/baseRDD of just RDD[Row] so I 
> could avoid the defensive copy (maybe lose our on columnar storage etc.).
>
> We have parts of our pipeline using SparkSQL/SchemaRDDs and others 
> using the core RDD api (mapPartitions etc.). Any tips?

(Michael has already answered this)

>
> Out of curiosity, a lot of SparkSQL functions seem to run in a 
> mapPartiton (e.g. Distinct). Does a defensive copy happen there too?

Only if necessary. For example, |Sort| does defensive copy as it needs 
to cache rows for sorting.

>
> Keen to get the best performance and the best blend of SparkSQL and 
> functional Spark.
>
> Cheers,
> Nathan
>
> From: Cheng Lian <lian.cs.zju@gmail.com <mailto:lian.cs.zju@gmail.com>>
> Date: Monday, 12 January 2015 1:21 am
> To: Nathan <nathan.mccarthy@quantium.com.au 
> <mailto:nathan.mccarthy@quantium.com.au>>, Michael Armbrust 
> <michael@databricks.com <mailto:michael@databricks.com>>
> Cc: "user@spark.apache.org <mailto:user@spark.apache.org>" 
> <user@spark.apache.org <mailto:user@spark.apache.org>>
> Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance 
> issues - columnar formats?
>
>
> On 1/11/15 1:40 PM, Nathan McCarthy wrote:
>> Thanks Cheng & Michael! Makes sense. Appreciate the tips!
>>
>> Idiomatic scala isn't performant. I’ll definitely start using while 
>> loops or tail recursive methods. I have noticed this in the spark 
>> code base.
>>
>> I might try turning off columnar compression (via 
>> /spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and 
>> see how performance compares to the primitive objects. Would you 
>> expect to see similar runtimes vs the primitive objects? We do have 
>> the luxury of lots of memory at the moment so this might give us an 
>> additional performance boost.
> Turning off compression should be faster, but still slower than 
> directly using primitive objects. Because Spark SQL also serializes 
> all objects within a column into byte buffers in a compact format. 
> However, this radically reduces number of Java objects in the heap and 
> is more GC friendly. When running large queries, cost introduced by GC 
> can be significant.
>>
>> Regarding the defensive copying of row objects. Can we switch this 
>> off and just be aware of the risks? Is MapPartitions on SchemaRDDs 
>> and operating on the Row object the most performant way to be 
>> flipping between SQL & Scala user code? Is there anything else I 
>> could be doing?
> This can be very dangerous and error prone. Whenever an operator tries 
> to cache row objects, turning off defensive copying can introduce 
> wrong query result. For example, sort-based shuffle caches rows to do 
> sorting. In some cases, sample operator may also cache row objects. 
> This is very implementation specific and may change between versions.
>>
>> Cheers,
>> ~N
>>
>> From: Michael Armbrust <michael@databricks.com 
>> <mailto:michael@databricks.com>>
>> Date: Saturday, 10 January 2015 3:41 am
>> To: Cheng Lian <lian.cs.zju@gmail.com <mailto:lian.cs.zju@gmail.com>>
>> Cc: Nathan <nathan.mccarthy@quantium.com.au 
>> <mailto:nathan.mccarthy@quantium.com.au>>, "user@spark.apache.org 
>> <mailto:user@spark.apache.org>" <user@spark.apache.org 
>> <mailto:user@spark.apache.org>>
>> Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance 
>> issues - columnar formats?
>>
>> The other thing to note here is that Spark SQL defensively copies 
>> rows when we switch into user code.  This probably explains the 
>> difference between 1 & 2.
>>
>> The difference between 1 & 3 is likely the cost of decompressing the 
>> column buffers vs. accessing a bunch of uncompressed primitive objects.
>>
>> On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <lian.cs.zju@gmail.com 
>> <mailto:lian.cs.zju@gmail.com>> wrote:
>>
>>     Hey Nathan,
>>
>>     Thanks for sharing, this is a very interesting post :) My
>>     comments are inlined below.
>>
>>     Cheng
>>
>>     On 1/7/15 11:53 AM, Nathan McCarthy wrote:
>>>     Hi,
>>>
>>>     I’m trying to use a combination of SparkSQL and ‘normal'
>>>     Spark/Scala via rdd.mapPartitions(…). Using the latest release
>>>     1.2.0.
>>>
>>>     Simple example; load up some sample data from parquet on HDFS
>>>     (about 380m rows, 10 columns) on a 7 node cluster.
>>>
>>>       val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
>>>     t.registerTempTable("test1”)
>>>     sqlC.cacheTable("test1”)
>>>
>>>     Now lets do some operations on it; I want the total sales &
>>>     quantities sold for each hour in the day so I choose 3 out of
>>>     the 10 possible columns...
>>>
>>>     sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group
>>>     by Hour").collect().foreach(println)
>>>
>>>     After the table has been 100% cached in memory, this takes
>>>     around 11 seconds.
>>>
>>>     Lets do the same thing but via a MapPartitions call (this isn’t
>>>     production ready code but gets the job done).
>>>
>>>       val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
>>>     rddPC.mapPartitions { case hrs =>
>>>         val qtySum = new Array[Double](24)
>>>         val salesSum = new Array[Double](24)
>>>
>>>         for(r <- hrs) {
>>>           val hr = r.getInt(0)
>>>           qtySum(hr) += r.getDouble(1)
>>>     salesSum(hr) += r.getDouble(2)
>>>         }
>>>         (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
>>>     }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
>>>     b._2)).collect().foreach(println)
>>     I believe the evil thing that makes this snippet much slower is
>>     the for-loop. According to my early benchmark done with Scala
>>     2.9, for-loop can be orders of magnitude slower than a simple
>>     while-loop, especially when the body of the loop only does
>>     something as trivial as this case. The reason is that Scala
>>     for-loop is translated into corresponding
>>     foreach/map/flatMap/withFilter function calls. And that's exactly
>>     why Spark SQL tries to avoid for-loop or any other functional
>>     style code in critical paths (where every row is touched), we
>>     also uses reusable mutable row objects instead of the immutable
>>     version to improve performance. You may check HiveTableScan,
>>     ParquetTableScan, InMemoryColumnarTableScan etc. for reference.
>>     Also, the `sum` function calls in your SQL code are translated
>>     into `o.a.s.s.execution.Aggregate` operators, which also use
>>     imperative while-loop and reusable mutable rows.
>>
>>     Another thing to notice is that the `hrs` iterator physically
>>     points to underlying in-memory columnar byte buffers, and the
>>     `for (r <- hrs) { ... }` loop actually decompresses and extracts
>>     values from required byte buffers (this is the "unwrapping"
>>     processes you mentioned below).
>>>
>>>     Now this takes around ~49 seconds… Even though test1 table is
>>>     100% cached. The number of partitions remains the same…
>>>
>>>     Now if I create a simple RDD of a case class HourSum(hour: Int,
>>>     qty: Double, sales: Double)
>>>
>>>     Convert the SchemaRDD;
>>>     val rdd = sqlC.sql("select * from test1").map{ r =>
>>>     HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()
>>>     //cache all the data
>>>     rdd.count()
>>>
>>>     Then run basically the same MapPartitions query;
>>>
>>>     rdd.mapPartitions { case hrs =>
>>>       val qtySum = new Array[Double](24)
>>>       val salesSum = new Array[Double](24)
>>>
>>>       for(r <- hrs) {
>>>         val hr = r.hour
>>>         qtySum(hr) += r.qty
>>>     salesSum(hr) += r.sales
>>>       }
>>>       (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
>>>     }.reduceByKey((a,b) => (a._1 + b._1, a._2 +
>>>     b._2)).collect().foreach(println)
>>>
>>>     This takes around 1.5 seconds! Albeit the memory footprint is
>>>     much larger.
>>     I guess this 1.5 seconds doesn't include the time spent on
>>     caching the simple RDD? As I've explained above, in the first
>>     `mapPartitions` style snippet, columnar byte buffer unwrapping
>>     happens within the `mapPartitions` call. However, in this
>>     version, the unwrapping process happens when the `rdd.count()`
>>     action is performed. At that point, all values of all columns are
>>     extracted from underlying byte buffers, and the portion of data
>>     you need are then manually selected and transformed into the
>>     simple case class RDD via the `map` call.
>>
>>     If you include time spent on caching the simple case class RDD,
>>     it should be even slower than the first `mapPartitions` version.
>>>
>>>     My thinking is that because SparkSQL does store things in a
>>>     columnar format, there is some unwrapping to be done out of the
>>>     column array buffers which takes time and for some reason this
>>>     just takes longer when I switch out to map partitions (maybe its
>>>     unwrapping the entire row, even though I’m using just a subset
>>>     of columns, or maybe there is some object creation/autoboxing
>>>     going on when calling getInt or getDouble)…
>>>
>>>     I’ve tried simpler cases too, like just summing sales. Running
>>>     sum via SQL is fast (4.7 seconds), running a mapPartition sum on
>>>     a double RDD is even faster (2.6 seconds). But MapPartitions on
>>>     the SchemaRDD;
>>>
>>>     /sqlC.sql("select SalesInclGST from test1").mapPartitions(iter
>>>     => Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum/
>>>
>>>      takes a long time (33 seconds). In all these examples
>>>     everything is fully cached in memory. And yes for these kinds of
>>>     operations I can use SQL, but for more complex queries I’d much
>>>     rather be using a combo of SparkSQL to select the data (so I get
>>>     nice things like Parquet pushdowns etc.) & functional Scala!
>>     Again, unfortunately, functional style code like `Iterator.sum`
>>     and `Iterator.foldLeft` can be really slow on critical paths.
>>>
>>>     I think I’m doing something dumb… Is there something I should be
>>>     doing to get faster performance on MapPartitions on SchemaRDDs?
>>>     Is there some unwrapping going on in the background that
>>>     catalyst does in a smart way that I’m missing?
>>     It makes sense that people use both Spark SQL and Spark core,
>>     especially when Spark SQL lacks features users need (like window
>>     function, for now). The suggestion here is, if you really care
>>     about performance (more than code readability and maintenance
>>     cost), then avoid immutable, functional code whenever possible on
>>     any critical paths...
>>>
>>>     Cheers,
>>>     ~N
>>>
>>>     Nathan McCarthy
>>>     QUANTIUM
>>>     Level 25, 8 Chifley, 8-12 Chifley Square
>>>     Sydney NSW 2000
>>>
>>>     T: +61 2 8224 8922 <tel:%2B61%202%208224%208922>
>>>     F: +61 2 9292 6444 <tel:%2B61%202%209292%206444>
>>>
>>>     W: quantium.com.au <http://www.quantium.com.au>
>>>
>>>     ------------------------------------------------------------------------
>>>
>>>     linkedin.com/company/quantium
>>>     <http://www.linkedin.com/company/quantium>
>>>
>>>     facebook.com/QuantiumAustralia
>>>     <http://www.facebook.com/QuantiumAustralia>
>>>
>>>     twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
>>>
>>>     The contents of this email, including attachments, may be
>>>     confidential information. If you are not the intended recipient,
>>>     any use, disclosure or copying of the information is
>>>     unauthorised. If you have
>>>     received this email in error, we would be grateful if you would
>>>     notify us immediately by email reply, phone (+ 61 2 9292 6400
>>>     <tel:%28%2B%2061%202%209292%206400>) or fax (+ 61 2 9292 6444
>>>     <tel:%28%2B%2061%202%209292%206444>) and delete the message from
>>>     your system.
>>
>>
>> Nathan McCarthy
>> QUANTIUM
>> Level 25, 8 Chifley, 8-12 Chifley Square
>> Sydney NSW 2000
>>
>> T: +61 2 8224 8922
>> F: +61 2 9292 6444
>>
>> W: quantium.com.au <www.quantium.com.au>
>>
>> ------------------------------------------------------------------------
>>
>> linkedin.com/company/quantium <www.linkedin.com/company/quantium>
>>
>> facebook.com/QuantiumAustralia <www.facebook.com/QuantiumAustralia>
>>
>> twitter.com/QuantiumAU <www.twitter.com/QuantiumAU>
>>
>> The contents of this email, including attachments, may be 
>> confidential information. If you are not the intended recipient, any 
>> use, disclosure or copying of the information is unauthorised. If you 
>> have
>> received this email in error, we would be grateful if you would 
>> notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax 
>> (+ 61 2 9292 6444) and delete the message from your system.
>
> Nathan McCarthy
> QUANTIUM
> Level 25, 8 Chifley, 8-12 Chifley Square
> Sydney NSW 2000
>
> T: +61 2 8224 8922
> F: +61 2 9292 6444
>
> W: quantium.com.au <www.quantium.com.au>
>
> ------------------------------------------------------------------------
>
> linkedin.com/company/quantium <www.linkedin.com/company/quantium>
>
> facebook.com/QuantiumAustralia <www.facebook.com/QuantiumAustralia>
>
> twitter.com/QuantiumAU <www.twitter.com/QuantiumAU>
>
> The contents of this email, including attachments, may be confidential 
> information. If you are not the intended recipient, any use, 
> disclosure or copying of the information is unauthorised. If you have
> received this email in error, we would be grateful if you would notify 
> us immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 
> 9292 6444) and delete the message from your system.

​

Mime
View raw message