spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yana Kadiyska <yana.kadiy...@gmail.com>
Subject Re: [SparkSQL] Try2: Parquet predicate pushdown troubles
Date Wed, 21 Jan 2015 15:11:30 GMT
Thanks for looking Cheng. Just to clarify in case other people need this
sooner, setting sc.hadoopConfiguration.set("parquet.task.side.metadata","
false")did work well in terms of dropping rowgroups/showing small input
size. What was odd about that is that the overall time wasn't much
better...but maybe that was overhead from sending the metadata clientside.

Thanks again and looking forward to your fix

On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian <lian.cs.zju@gmail.com> wrote:

>  Hey Yana,
>
> Sorry for the late reply, missed this important thread somehow. And many
> thanks for reporting this. It turned out to be a bug — filter pushdown is
> only enabled when using client side metadata, which is not expected,
> because task side metadata code path is more performant. And I guess that
> the reason why setting parquet.task.side.metadata to false didn’t reduce
> input size for you is because you set the configuration with Spark API, or
> put it into spark-defaults.conf. This configuration goes to Hadoop
> Configuration, and Spark only merge properties whose names start with
> spark.hadoop into Hadoop Configuration instances. You may try to put
> parquet.task.side.metadata config into Hadoop core-site.xml, and then
> re-run the query. I can see significant differences by doing so.
>
> I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
> reporting all the details!
>
> Cheng
>
> On 1/13/15 12:56 PM, Yana Kadiyska wrote:
>
>   Attempting to bump this up in case someone can help out after all. I
> spent a few good hours stepping through the code today, so I'll summarize
> my observations both in hope I get some help and to help others that might
> be looking into this:
>
>  1. I am setting *spark.sql.parquet.**filterPushdown=true*
> 2. I can see by stepping through the driver debugger that
> PaquetTableOperations.execute sets the filters via
> ParquetInputFormat.setFilterPredicate (I checked the conf object, things
> appear OK there)
> 3. In FilteringParquetRowInputFormat, I get through the codepath for
> getTaskSideSplits. It seems that the codepath for getClientSideSplits would
> try to drop rowGroups but I don't see similar in getTaskSideSplit.
>
>  Does anyone have pointers on where to look after this? Where is rowgroup
> filtering happening in the case of getTaskSideSplits? I can attach to the
> executor but am not quite sure what code related to Parquet gets called
> executor side...also don't see any messages in the executor logs related to
> Filtering predicates.
>
> For comparison, I went through the getClientSideSplits and can see that
> predicate pushdown works OK:
>
>
> sc.hadoopConfiguration.set("parquet.task.side.metadata","false")
>
> 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side Metadata Split
Strategy
> 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 1417384800)
> 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups that do
not pass filter predicate (28 %) !
>
> ​
>
>  Is it possible that this is just a UI bug? I can see Input=4G when using
> ("parquet.task.side.metadata","false") and Input=140G when using
> ("parquet.task.side.metadata","true") but the runtimes are very comparable?
>
>  [image: Inline image 1]
>
>
>  JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.
>
>
>
>  On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska <yana.kadiyska@gmail.com>
> wrote:
>
>> I am running the following (connecting to an external Hive Metastore)
>>
>>   /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
>> *spark.sql.parquet.filterPushdown=true*
>>
>>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>>  and then ran two queries:
>>
>> sqlContext.sql("select count(*) from table where partition='blah' ")
>> andsqlContext.sql("select count(*) from table where partition='blah' and epoch=1415561604")
>>
>> ​
>>
>>  According to the Input tab in the UI both scan about 140G of data which
>> is the size of my whole partition. So I have two questions --
>>
>>  1. is there a way to tell from the plan if a predicate pushdown is
>> supposed to happen?
>> I see this for the second query
>>
>> res0: org.apache.spark.sql.SchemaRDD =
>> SchemaRDD[0] at RDD at SchemaRDD.scala:108
>> == Query Plan ==
>> == Physical Plan ==
>> Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
>>  Exchange SinglePartition
>>   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
>>    OutputFaker []
>>     Project []
>>      ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs files>
>>
>> ​
>>  2. am I doing something obviously wrong that this is not working? (Im
>> guessing it's not woring because the input size for the second query shows
>> unchanged and the execution time is almost 2x as long)
>>
>>  thanks in advance for any insights
>>
>>
>    ​
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message