spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Kruszewski <>
Subject Re: Expected benefit of parquet filter pushdown?
Date Wed, 31 Aug 2016 22:41:37 GMT
Your statistics seem corrupted. The creator filed doesn’t match the version spec and as such
parquet is not using it to filter. I would check whether you have references to PARQUET-251
or PARQUET-297 in your executor logs. This bug existed between parquet 1.5.0 and 1.8.0. Checkout Only master of spark has parquet >=1.8.0.

Also checkout VersionParser in parquet since your createdBy is invalid and even if you have
fixed parquet it will be deemed corrupted.

-          Robert

On 8/31/16, 10:29 PM, " on behalf of Christon DeWan" <>

    I have a data set stored in parquet with several short key fields and one relatively large
(several kb) blob field. The data set is sorted by key1, key2.
        message spark_schema {
          optional binary key1 (UTF8);
          optional binary key2;
          optional binary blob;
    One use case of this dataset is to fetch all the blobs for a given predicate of key1,
key2. I would expect parquet predicate pushdown to help greatly by not reading blobs from
rowgroups where the predicate on the keys matched zero records. That does not appear to be
the case, however.
    For a predicate that only returns 2 rows (out of 6 million), this query:
    	select sum(length(key2)) from t2 where key1 = 'rare value'
    takes 5x longer and reads 50x more data (according to the web UI) than this query:
    	select sum(length(blob)) from t2 where key1 = 'rare value'
    The parquet scan does appear to be getting the predicate (says explain(), see below),
and those columns do even appear to be dictionary encoded (see further below).
    So does filter pushdown not actually allow us to read less data or is there something
wrong with my setup?
    scala> spark.sql("select sum(length(blob)) from t2 where key1 = 'rare value'").explain()
    == Physical Plan ==
    *HashAggregate(keys=[], functions=[sum(cast(length(blob#48) as bigint))])
    +- Exchange SinglePartition
       +- *HashAggregate(keys=[], functions=[partial_sum(cast(length(blob#48) as bigint))])
          +- *Project [blob#48]
             +- *Filter (isnotnull(key1#46) && (key1#46 = rare value))
                +- *BatchedScan parquet [key1#46,blob#48] Format: ParquetFormat, InputPaths:
hdfs://nameservice1/user/me/parquet_test/blob, PushedFilters: [IsNotNull(key1), EqualTo(key1,rare
value)], ReadSchema: struct<key1:string,blob:binary>
    $ parquet-tools meta example.snappy.parquet 
    creator:     parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d) 
    extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"key1","type":"string","nullable":true,"metadata":{}},{"name":"key2","type":"binary","nullable":true,"metadata":{}},{"
    file schema: spark_schema 
    key1:        OPTIONAL BINARY O:UTF8 R:0 D:1
    key2:        OPTIONAL BINARY R:0 D:1
    blob:        OPTIONAL BINARY R:0 D:1
    row group 1: RC:3971 TS:320593029 
    key2:         BINARY SNAPPY DO:0 FPO:88 SZ:49582/53233/1.07 VC:3971 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
    blob:         BINARY SNAPPY DO:0 FPO:49670 SZ:134006918/320539716/2.39 VC:3971 ENC:BIT_PACKED,RLE,PLAIN
    To unsubscribe e-mail:

View raw message