spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Preliminary Parquet numbers and including .count() in Catalyst
Date Tue, 13 May 2014 04:46:13 GMT
Hi Spark devs,

First of all, huge congrats on the parquet integration with SparkSQL!  This
is an incredible direction forward and something I can see being very
broadly useful.

I was doing some preliminary tests to see how it works with one of my
workflows, and wanted to share some numbers that people might want to know
about.

I also wanted to point out that .count() doesn't seem integrated with the
rest of the optimization framework, and some big gains could be possible.


So, the numbers:

I took a table extracted from a SQL database and stored in HDFS:

   - 115 columns (several always-empty, mostly strings, some enums, some
   numbers)
   - 253,887,080 rows
   - 182,150,295,881 bytes (raw uncompressed)
   - 42,826,820,222 bytes (lzo compressed with .index file)

And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet()
call:

   - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42
   cores across 4 machines
   - 17,517,922,117 bytes (parquet per SparkSQL defaults)

So storing in parquet format vs lzo compresses the data down to less than
50% of the .lzo size, and under 10% of the raw uncompressed size.  Nice!


I then did some basic interactions on it:

*Row count*

   - LZO
      - lzoFile("/path/to/lzo").count
      - 31.632305953s
   - Parquet
      - sqlContext.parquetFile("/path/to/parquet").count
      - 289.129487003s

Reassembling rows from the separate column storage is clearly really
expensive.  Median task length is 33s vs 4s, and of that 33s in each task
(319 tasks total) about 1.75 seconds are spent in GC (inefficient object
allocation?)



*Count number of rows with a particular key:*

   - LZO
   - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) == "1234567890").count
      - 73.988897511s
       - Parquet
   - sqlContext.parquetFile("/path/to/parquet").where('COL ===
      1234567890).count
      - 293.410470418s
       - Parquet (hand-tuned to count on just one column)
   - sqlContext.parquetFile("/path/to/parquet").where('COL ===
      1234567890).select('IDCOL).count
      - 1.160449187s

It looks like currently the .count() on parquet is handled incredibly
inefficiently and all the columns are materialized.  But if I select just
that relevant column and then count, then the column-oriented storage of
Parquet really shines.

There ought to be a potential optimization here such that a .count() on a
SchemaRDD backed by Parquet doesn't require re-assembling the rows, as
that's expensive.  I don't think .count() is handled specially in
SchemaRDDs, but it seems ripe for optimization.


*Count number of distinct values in a column*

   - LZO
   - lzoFile("/path/to/lzo").map(sel(0)).distinct.count
      - 115.582916866s
       - Parquet
   - sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count
      - 16.839004826 s

It turns out column selectivity is very useful!  I'm guessing that if I
could get byte counts read out of HDFS, that would just about match up with
the difference in read times.




Any thoughts on how to embed the knowledge of my hand-tuned additional
.select('IDCOL)
into Catalyst?


Thanks again for all the hard work and prep for the 1.0 release!

Andrew

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message