spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan McCarthy <Nathan.McCar...@quantium.com.au>
Subject Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?
Date Fri, 16 Jan 2015 07:26:55 GMT
Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which would allow me to load
up the low level/baseRDD of just RDD[Row] so I could avoid the defensive copy (maybe lose
our on columnar storage etc.).

We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the core RDD api
(mapPartitions etc.). Any tips?

Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton (e.g. Distinct).
Does a defensive copy happen there too?

Keen to get the best performance and the best blend of SparkSQL and functional Spark.

Cheers,
Nathan

From: Cheng Lian <lian.cs.zju@gmail.com<mailto:lian.cs.zju@gmail.com>>
Date: Monday, 12 January 2015 1:21 am
To: Nathan <nathan.mccarthy@quantium.com.au<mailto:nathan.mccarthy@quantium.com.au>>,
Michael Armbrust <michael@databricks.com<mailto:michael@databricks.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar
formats?


On 1/11/15 1:40 PM, Nathan McCarthy wrote:
Thanks Cheng & Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or tail recursive
methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via spark.sql.inMemoryColumnarStorage.compressed=false
correct?) and see how performance compares to the primitive objects. Would you expect to see
similar runtimes vs the primitive objects? We do have the luxury of lots of memory at the
moment so this might give us an additional performance boost.
Turning off compression should be faster, but still slower than directly using primitive objects.
Because Spark SQL also serializes all objects within a column into byte buffers in a compact
format. However, this radically reduces number of Java objects in the heap and is more GC
friendly. When running large queries, cost introduced by GC can be significant.

Regarding the defensive copying of row objects. Can we switch this off and just be aware of
the risks? Is MapPartitions on SchemaRDDs and operating on the Row object the most performant
way to be flipping between SQL & Scala user code? Is there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries to cache row objects,
turning off defensive copying can introduce wrong query result. For example, sort-based shuffle
caches rows to do sorting. In some cases, sample operator may also cache row objects. This
is very implementation specific and may change between versions.

Cheers,
~N

From: Michael Armbrust <michael@databricks.com<mailto:michael@databricks.com>>
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian <lian.cs.zju@gmail.com<mailto:lian.cs.zju@gmail.com>>
Cc: Nathan <nathan.mccarthy@quantium.com.au<mailto:nathan.mccarthy@quantium.com.au>>,
"user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar
formats?

The other thing to note here is that Spark SQL defensively copies rows when we switch into
user code.  This probably explains the difference between 1 & 2.

The difference between 1 & 3 is likely the cost of decompressing the column buffers vs.
accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <lian.cs.zju@gmail.com<mailto:lian.cs.zju@gmail.com>>
wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via rdd.mapPartitions(…).
Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 10 columns)
on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
  t.registerTempTable("test1”)
  sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales & quantities sold for each hour
in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production ready code but
gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =>
    val qtySum = new Array[Double](24)
    val salesSum = new Array[Double](24)

    for(r <- hrs) {
      val hr = r.getInt(0)
      qtySum(hr) += r.getDouble(1)
      salesSum(hr) += r.getDouble(2)
    }
    (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop. According to
my early benchmark done with Scala 2.9, for-loop can be orders of magnitude slower than a
simple while-loop, especially when the body of the loop only does something as trivial as
this case. The reason is that Scala for-loop is translated into corresponding foreach/map/flatMap/withFilter
function calls. And that's exactly why Spark SQL tries to avoid for-loop or any other functional
style code in critical paths (where every row is touched), we also uses reusable mutable row
objects instead of the immutable version to improve performance. You may check HiveTableScan,
ParquetTableScan, InMemoryColumnarTableScan etc. for reference. Also, the `sum` function calls
in your SQL code are translated into `o.a.s.s.execution.Aggregate` operators, which also use
imperative while-loop and reusable mutable rows.

Another thing to notice is that the `hrs` iterator physically points to underlying in-memory
columnar byte buffers, and the `for (r <- hrs) { ... }` loop actually decompresses and
extracts values from required byte buffers (this is the "unwrapping" processes you mentioned
below).

Now this takes around ~49 seconds… Even though test1 table is 100% cached. The number of
partitions remains the same…

Now if I create a simple RDD of a case class HourSum(hour: Int, qty: Double, sales: Double)

Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), r.getDouble(7),
r.getDouble(8)) }.cache()
//cache all the data
rdd.count()

Then run basically the same MapPartitions query;

rdd.mapPartitions { case hrs =>
  val qtySum = new Array[Double](24)
  val salesSum = new Array[Double](24)

  for(r <- hrs) {
    val hr = r.hour
    qtySum(hr) += r.qty
    salesSum(hr) += r.sales
  }
  (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 + b._2)).collect().foreach(println)

This takes around 1.5 seconds! Albeit the memory footprint is much larger.
I guess this 1.5 seconds doesn't include the time spent on caching the simple RDD? As I've
explained above, in the first `mapPartitions` style snippet, columnar byte buffer unwrapping
happens within the `mapPartitions` call. However, in this version, the unwrapping process
happens when the `rdd.count()` action is performed. At that point, all values of all columns
are extracted from underlying byte buffers, and the portion of data you need are then manually
selected and transformed into the simple case class RDD via the `map` call.

If you include time spent on caching the simple case class RDD, it should be even slower than
the first `mapPartitions` version.

My thinking is that because SparkSQL does store things in a columnar format, there is some
unwrapping to be done out of the column array buffers which takes time and for some reason
this just takes longer when I switch out to map partitions (maybe its unwrapping the entire
row, even though I’m using just a subset of columns, or maybe there is some object creation/autoboxing
going on when calling getInt or getDouble)…

I’ve tried simpler cases too, like just summing sales. Running sum via SQL is fast (4.7
seconds), running a mapPartition sum on a double RDD is even faster (2.6 seconds). But MapPartitions
on the SchemaRDD;

sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => Iterator(iter.foldLeft(0.0)((t,r)
=> t+r.getDouble(0)))).sum

 takes a long time (33 seconds). In all these examples everything is fully cached in memory.
And yes for these kinds of operations I can use SQL, but for more complex queries I’d much
rather be using a combo of SparkSQL to select the data (so I get nice things like Parquet
pushdowns etc.) & functional Scala!
Again, unfortunately, functional style code like `Iterator.sum` and `Iterator.foldLeft` can
be really slow on critical paths.

I think I’m doing something dumb… Is there something I should be doing to get faster performance
on MapPartitions on SchemaRDDs? Is there some unwrapping going on in the background that catalyst
does in a smart way that I’m missing?
It makes sense that people use both Spark SQL and Spark core, especially when Spark SQL lacks
features users need (like window function, for now). The suggestion here is, if you really
care about performance (more than code readability and maintenance cost), then avoid immutable,
functional code whenever possible on any critical paths...

Cheers,
~N

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922<tel:%2B61%202%208224%208922>
F: +61 2 9292 6444<tel:%2B61%202%209292%206444>

W: quantium.com.au<http://www.quantium.com.au>

________________________________

linkedin.com/company/quantium<http://www.linkedin.com/company/quantium>

facebook.com/QuantiumAustralia<http://www.facebook.com/QuantiumAustralia>

twitter.com/QuantiumAU<http://www.twitter.com/QuantiumAU>

The contents of this email, including attachments, may be confidential information. If you
are not the intended recipient, any use, disclosure or copying of the information is unauthorised.
If you have
received this email in error, we would be grateful if you would notify us immediately by email
reply, phone (+ 61 2 9292 6400<tel:%28%2B%2061%202%209292%206400>) or fax (+ 61 2 9292
6444<tel:%28%2B%2061%202%209292%206444>) and delete the message from your system.


Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.au<www.quantium.com.au>

________________________________

linkedin.com/company/quantium<www.linkedin.com/company/quantium>

facebook.com/QuantiumAustralia<www.facebook.com/QuantiumAustralia>

twitter.com/QuantiumAU<www.twitter.com/QuantiumAU>

The contents of this email, including attachments, may be confidential information. If you
are not the intended recipient, any use, disclosure or copying of the information is unauthorised.
If you have
received this email in error, we would be grateful if you would notify us immediately by email
reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from your
system.

Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8224 8922
F: +61 2 9292 6444

W: quantium.com.au<www.quantium.com.au>

________________________________

linkedin.com/company/quantium<www.linkedin.com/company/quantium>

facebook.com/QuantiumAustralia<www.facebook.com/QuantiumAustralia>

twitter.com/QuantiumAU<www.twitter.com/QuantiumAU>


The contents of this email, including attachments, may be confidential information. If you
are not the intended recipient, any use, disclosure or copying of the information is unauthorised.
If you have received this email in error, we would be grateful if you would notify us immediately
by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message
from your system.

Mime
View raw message