spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Date Thu, 29 Jan 2015 01:15:10 GMT
Hey Yana,

An update about this Parquet filter push-down issue. It turned out to be 
a bit complicated, but (hopefully) all clear now.

 1.

    Yesterday I found a bug in Parquet, which essentially disables row
    group filtering for almost all |AND| predicates.

      * JIRA ticket: PARQUET-173
        <https://issues.apache.org/jira/browse/PARQUET-173>
      * PR (not merged yet): PR #108
        <https://github.com/apache/incubator-parquet-mr/pull/108>
 2.

    I verified that filter push-down actually is enabled even if we set
    |parquet.task.side.metadata| to |true|.

    The actual filtering happens when the
    |ParquetRecordReader.initialize()| is called in
    |NewHadoopRDD.compute|. See here
    <https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L135>
    and here
    <https://github.com/apache/incubator-parquet-mr/blob/parquet-1.6.0rc3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java#L157-L158>.
    However, due to PARQUET-173 mentioned above, no row group can be
    dropped because you were using an |AND| predicate.

    As for Spark task input size. It seems that Hadoop |FileSystem| adds
    the size of a block to the metrics even if you only touch a fraction
    of it (reading Parquet metadata for example). This behavior can be
    verified by the following snippet:

    |import  org.apache.spark.sql.Row
    import  org.apache.spark.sql.SQLContext

    val  sqlContext  =  new  SQLContext(sc)
    import  sc._
    import  sqlContext._

    case  class  KeyValue(key:Int, value:String)

    parallelize(1  to1024  *1024  *20).
       flatMap(i =>Seq.fill(10)(KeyValue(i, i.toString))).
       saveAsParquetFile("large.parquet")

    hadoopConfiguration.set("parquet.task.side.metadata","true")
    sql("SET spark.sql.parquet.filterPushdown=true")

    parquetFile("large.parquet").where('key  ===0).queryExecution.toRdd.mapPartitions { _
=>
       new  Iterator[Row] {
         def  hasNext  =  false
         def  next() = ???
       }
    }.collect()
    |

    Apparently we’re reading nothing here (except for Parquet metadata
    in the footers), but the web UI still suggests that the input size
    of all tasks equals to the file size. In addition, you may find log
    lines written by |ParquetRecordReader| like this:

    |...
    15/01/28 16:50:56 INFO FilterCompat: Filtering using predicate: eq(key, 0)
    15/01/28 16:50:56 INFO InternalParquetRecordReader: RecordReader initialized will read
a total of 0 records.
    ...
    |

    which suggests row group filtering does work as expected:

    So I’ll just close SPARK-5346
    <https://issues.apache.org/jira/browse/SPARK-5346> since task side
    metadata reading doesn’t affect row group filtering.

 3.

    SPARK-5463 <https://issues.apache.org/jira/browse/SPARK-5463> was
    created as an umbrella ticket for all Parquet filter push-down
    related issues.

    You may find more details there. Right now all sub-tasks there are
    either fixed or have PRs available.

Best,
Cheng

On 1/21/15 10:39 AM, Cheng Lian wrote:

> Oh yes, thanks for adding that using |sc.hadoopConfiguration.set| also 
> works :-)
>
> ​
>
> On Wed, Jan 21, 2015 at 7:11 AM, Yana Kadiyska 
> <yana.kadiyska@gmail.com <mailto:yana.kadiyska@gmail.com>> wrote:
>
>     Thanks for looking Cheng. Just to clarify in case other people
>     need this sooner, setting
>     sc.hadoopConfiguration.set("parquet.task.side.metadata","false")did work
>     well in terms of dropping rowgroups/showing small input size. What
>     was odd about that is that the overall time wasn't much
>     better...but maybe that was overhead from sending the metadata
>     clientside.
>
>     Thanks again and looking forward to your fix
>
>     On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian <lian.cs.zju@gmail.com
>     <mailto:lian.cs.zju@gmail.com>> wrote:
>
>         Hey Yana,
>
>         Sorry for the late reply, missed this important thread
>         somehow. And many thanks for reporting this. It turned out to
>         be a bug — filter pushdown is only enabled when using client
>         side metadata, which is not expected, because task side
>         metadata code path is more performant. And I guess that the
>         reason why setting |parquet.task.side.metadata| to |false|
>         didn’t reduce input size for you is because you set the
>         configuration with Spark API, or put it into
>         |spark-defaults.conf|. This configuration goes to Hadoop
>         |Configuration|, and Spark only merge properties whose names
>         start with |spark.hadoop| into Hadoop |Configuration|
>         instances. You may try to put |parquet.task.side.metadata|
>         config into Hadoop |core-site.xml|, and then re-run the query.
>         I can see significant differences by doing so.
>
>         I’ll open a JIRA and deliver a fix for this ASAP. Thanks again
>         for reporting all the details!
>
>         Cheng
>
>         On 1/13/15 12:56 PM, Yana Kadiyska wrote:
>
>>         Attempting to bump this up in case someone can help out after
>>         all. I spent a few good hours stepping through the code
>>         today, so I'll summarize my observations both in hope I get
>>         some help and to help others that might be looking into this:
>>
>>         1. I am setting *spark.sql.parquet.**filterPushdown=true*
>>         2. I can see by stepping through the driver debugger that
>>         PaquetTableOperations.execute sets the filters via
>>         ParquetInputFormat.setFilterPredicate (I checked the conf
>>         object, things appear OK there)
>>         3. In FilteringParquetRowInputFormat, I get through the
>>         codepath for getTaskSideSplits. It seems that the codepath
>>         for getClientSideSplits would try to drop rowGroups but I
>>         don't see similar in getTaskSideSplit.
>>
>>         Does anyone have pointers on where to look after this? Where
>>         is rowgroup filtering happening in the case of
>>         getTaskSideSplits? I can attach to the executor but am not
>>         quite sure what code related to Parquet gets called executor
>>         side...also don't see any messages in the executor logs
>>         related to Filtering predicates.
>>         For comparison, I went through the getClientSideSplits and
>>         can see that predicate pushdown works OK:
>>         |sc.hadoopConfiguration.set("parquet.task.side.metadata","false")
>>
>>         15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side
Metadata Split Strategy
>>         15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch,
1417384800)
>>         15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups
that do not pass filter predicate (28 %) !
>>         |
>>         ​
>>
>>         Is it possible that this is just a UI bug? I can see Input=4G
>>         when using ("parquet.task.side.metadata","false") and
>>         Input=140G when using ("parquet.task.side.metadata","true")
>>         but the runtimes are very comparable?
>>
>>         Inline image 1
>>
>>
>>         JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.
>>
>>
>>
>>         On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska
>>         <yana.kadiyska@gmail.com <mailto:yana.kadiyska@gmail.com>> wrote:
>>
>>             I am running the following (connecting to an external
>>             Hive Metastore)
>>
>>              /a/shark/spark/bin/spark-shell --master spark://ip:7077
>>              --conf *spark.sql.parquet.filterPushdown=true*
>>
>>             val sqlContext = new
>>             org.apache.spark.sql.hive.HiveContext(sc)
>>
>>             and then ran two queries:
>>
>>             |sqlContext.sql("select count(*) from table where partition='blah' ")
>>             and
>>             sqlContext.sql("select count(*) from table where partition='blah' and
epoch=1415561604")
>>             |
>>
>>             ​
>>
>>             According to the Input tab in the UI both scan about 140G
>>             of data which is the size of my whole partition. So I
>>             have two questions --
>>
>>             1. is there a way to tell from the plan if a predicate
>>             pushdown is supposed to happen?
>>             I see this for the second query
>>
>>             |res0: org.apache.spark.sql.SchemaRDD =
>>             SchemaRDD[0] at RDD at SchemaRDD.scala:108
>>             == Query Plan ==
>>             == Physical Plan ==
>>             Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
>>               Exchange SinglePartition
>>                Aggregate true, [], [COUNT(1) AS PartialCount#49L]
>>                 OutputFaker []
>>                  Project []
>>                   ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs
files>
>>             |
>>
>>             ​
>>             2. am I doing something obviously wrong that this is not
>>             working? (Im guessing it's not woring because the input
>>             size for the second query shows unchanged and the
>>             execution time is almost 2x as long)
>>
>>             thanks in advance for any insights
>>
>>
>         ​
>
>
>
​

Mime
View raw message