spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nasir Ali (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
Date Sat, 07 Dec 2019 15:33:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Nasir Ali updated SPARK-30162:
------------------------------
    Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different.
It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce
the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
                            ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data],
PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters:
[], ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan ==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
      +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data],
ReadSchema: struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply
filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied
partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard to
debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.

 

  was:
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different.
It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce
the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
                            ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data],
PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters:
[], ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan ==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
      +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data],
ReadSchema: struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply
filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied
partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard to
debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.

 


> Filter is not being pushed down for Parquet files
> -------------------------------------------------
>
>                 Key: SPARK-30162
>                 URL: https://issues.apache.org/jira/browse/SPARK-30162
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>            Reporter: Nasir Ali
>            Priority: Major
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method
is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code
could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
>                             ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
>                             ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
>                             ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
>                             ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
>                             ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
>                             ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-16T11:27:18+00:00")
>                             ],
>                            ["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40),
(user#40 = usr2)], PushedFilters: [], ReadSchema: struct<id:double,ts:timestamp>{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan
==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan
==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>    +- *(1) ColumnarToRow
>       +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data],
ReadSchema: struct<id:double,ts:timestamp>
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then
apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied
partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard
to debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message