spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Date Wed, 21 Jan 2015 02:07:05 GMT
Hey Yana,

Sorry for the late reply, missed this important thread somehow. And many 
thanks for reporting this. It turned out to be a bug — filter pushdown 
is only enabled when using client side metadata, which is not expected, 
because task side metadata code path is more performant. And I guess 
that the reason why setting |parquet.task.side.metadata| to |false| 
didn’t reduce input size for you is because you set the configuration 
with Spark API, or put it into |spark-defaults.conf|. This configuration 
goes to Hadoop |Configuration|, and Spark only merge properties whose 
names start with |spark.hadoop| into Hadoop |Configuration| instances. 
You may try to put |parquet.task.side.metadata| config into Hadoop 
|core-site.xml|, and then re-run the query. I can see significant 
differences by doing so.

I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for 
reporting all the details!

Cheng

On 1/13/15 12:56 PM, Yana Kadiyska wrote:

> Attempting to bump this up in case someone can help out after all. I 
> spent a few good hours stepping through the code today, so I'll 
> summarize my observations both in hope I get some help and to help 
> others that might be looking into this:
>
> 1. I am setting *spark.sql.parquet.**filterPushdown=true*
> 2. I can see by stepping through the driver debugger that 
> PaquetTableOperations.execute sets the filters via 
> ParquetInputFormat.setFilterPredicate (I checked the conf object, 
> things appear OK there)
> 3. In FilteringParquetRowInputFormat, I get through the codepath for 
> getTaskSideSplits. It seems that the codepath for getClientSideSplits 
> would try to drop rowGroups but I don't see similar in getTaskSideSplit.
>
> Does anyone have pointers on where to look after this? Where is 
> rowgroup filtering happening in the case of getTaskSideSplits? I can 
> attach to the executor but am not quite sure what code related to 
> Parquet gets called executor side...also don't see any messages in the 
> executor logs related to Filtering predicates.
> For comparison, I went through the getClientSideSplits and can see 
> that predicate pushdown works OK:
> |sc.hadoopConfiguration.set("parquet.task.side.metadata","false")
>
> 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split
Strategy
> 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800)
> 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do
not pass filter predicate (28 %) !
> |
> ​
>
> Is it possible that this is just a UI bug? I can see Input=4G when 
> using ("parquet.task.side.metadata","false") and Input=140G when using 
> ("parquet.task.side.metadata","true") but the runtimes are very 
> comparable?
>
> Inline image 1
>
>
> JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.
>
>
>
> On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska <yana.kadiyska@gmail.com 
> <mailto:yana.kadiyska@gmail.com>> wrote:
>
>     I am running the following (connecting to an external Hive Metastore)
>
>      /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
>     *spark.sql.parquet.filterPushdown=true*
>
>     val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
>     and then ran two queries:
>
>     |sqlContext.sql("select count(*) from table where partition='blah' ")
>     and
>     sqlContext.sql("select count(*) from table where partition='blah' and epoch=1415561604")
>     |
>
>     ​
>
>     According to the Input tab in the UI both scan about 140G of data
>     which is the size of my whole partition. So I have two questions --
>
>     1. is there a way to tell from the plan if a predicate pushdown is
>     supposed to happen?
>     I see this for the second query
>
>     |res0: org.apache.spark.sql.SchemaRDD =
>     SchemaRDD[0] at RDD at SchemaRDD.scala:108
>     == Query Plan ==
>     == Physical Plan ==
>     Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
>       Exchange SinglePartition
>        Aggregate true, [], [COUNT(1) AS PartialCount#49L]
>         OutputFaker []
>          Project []
>           ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs files>
>     |
>
>     ​
>     2. am I doing something obviously wrong that this is not working?
>     (Im guessing it's not woring because the input size for the second
>     query shows unchanged and the execution time is almost 2x as long)
>
>     thanks in advance for any insights
>
>
​

Mime
View raw message