spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Everett Anderson (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19586) Incorrect push down filter for double negative in SQL
Date Tue, 14 Feb 2017 02:41:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Everett Anderson updated SPARK-19586:
-------------------------------------
    Description: 
Opening this as it's a somewhat serious issue in the 2.0.x tree in case there's a 2.0.3 planned,
but it is fixed in 2.1.0.

While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter optimization
error.

Example:

{noformat}
// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))

val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))
    
val data = sqlContext.createDataFrame(rowsRDD, schema)
val path = "/tmp/test_data"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")
{noformat}

{noformat}
%sql
explain select count(*) from filter_test_table where not( username is not null)
{noformat}

or

{noformat}
spark.sql("select count(*) from filter_test_table where not( username is not null)").explain
{noformat}

In 2.0.2, I'm seeing

{noformat}
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat,
InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username),
Not(IsNotNull(username))], ReadSchema: struct<username:string>
{noformat}

which seems like both an impossible Filter and an impossible pushed filter.

In Spark 1.6.2 it was

{noformat}
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters:
[Not(IsNotNull(username))]
{noformat}

and 2.1.0 it's working again as

{noformat}
== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>
{noformat}

while it's easy for humans in interactive cases to work around this by removing the double
negative, it's a bit harder if it's a programmatic construct.


  was:
Opening this as it's a somewhat serious issue in the 2.0.x tree in case there's a 2.0.3 planned,
but it is fixed in 2.1.0.

While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter optimization
error.

Example:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))

val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))
    
val data = sqlContext.createDataFrame(rowsRDD, schema)
val path = "/tmp/test_data"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")

%sql
explain select count(*) from filter_test_table where not( username is not null)

or

spark.sql("select count(*) from filter_test_table where not( username is not null)").explain

In 2.0.2, I'm seeing

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat,
InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username),
Not(IsNotNull(username))], ReadSchema: struct<username:string>

which seems like both an impossible Filter and an impossible pushed filter.

In Spark 1.6.2 it was

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters:
[Not(IsNotNull(username))]

and 2.1.0 it's working again as

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct<username:string>

while it's easy for humans in interactive cases to work around this by removing the double
negative, it's a bit harder if it's a programmatic construct.



> Incorrect push down filter for double negative in SQL
> -----------------------------------------------------
>
>                 Key: SPARK-19586
>                 URL: https://issues.apache.org/jira/browse/SPARK-19586
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2
>            Reporter: Everett Anderson
>             Fix For: 2.1.0
>
>
> Opening this as it's a somewhat serious issue in the 2.0.x tree in case there's a 2.0.3
planned, but it is fixed in 2.1.0.
> While it works in 1.6.2 and 2.1.0, it appears 2.0.2 has a significant filter optimization
error.
> Example:
> {noformat}
> // Create some fake data
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> val rowsRDD = sc.parallelize(Seq(
>     Row(1, "fred"),
>     Row(2, "amy"),
>     Row(3, null)))
> val schema = StructType(Seq(
>     StructField("id", IntegerType, nullable = true),
>     StructField("username", StringType, nullable = true)))
>     
> val data = sqlContext.createDataFrame(rowsRDD, schema)
> val path = "/tmp/test_data"
> data.write.mode("overwrite").parquet(path)
> val testData = sqlContext.read.parquet(path)
> testData.registerTempTable("filter_test_table")
> {noformat}
> {noformat}
> %sql
> explain select count(*) from filter_test_table where not( username is not null)
> {noformat}
> or
> {noformat}
> spark.sql("select count(*) from filter_test_table where not( username is not null)").explain
> {noformat}
> In 2.0.2, I'm seeing
> {noformat}
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.<hive table name>[username#35] Format: ParquetFormat,
InputPaths: <path to parquet>, PartitionFilters: [], PushedFilters: [IsNotNull(username),
Not(IsNotNull(username))], ReadSchema: struct<username:string>
> {noformat}
> which seems like both an impossible Filter and an impossible pushed filter.
> In Spark 1.6.2 it was
> {noformat}
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>, PushedFilters:
[Not(IsNotNull(username))]
> {noformat}
> and 2.1.0 it's working again as
> {noformat}
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>       +- *Project
>          +- *Filter NOT isnotnull(username#14)
>             +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct<username:string>
> {noformat}
> while it's easy for humans in interactive cases to work around this by removing the double
negative, it's a bit harder if it's a programmatic construct.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message