Dear all,

I am not sure if it is something expected or not, and should I report it as a bug.  Basically, the constraints of a union table could be turned empty if any subtable is turned into an empty local relation. The side effect is filter cannot be inferred correctly (by InferFiltersFromConstrains) 

We may reproduce the issue with the following setup:
1) Prepare two tables: 
* spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
* spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");

2) Create a union view on table1. 
* spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)

3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot be inferred. We can see that the constraints of the left table are empty.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()

4) Modified the query to avoid empty local relation. The filter 't2.id in ('a','b','c','d')' is then inferred properly. The constraints of the left table are not empty as well.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 = c)) || NOT id#0 IN (a,b,c)))


Thanks and regards,
William


On Sat, Jun 15, 2019 at 1:13 AM William Wong <william1104@gmail.com> wrote:
Hi all, 

Appreciate any expert may help on this strange behavior..

It is interesting that... I implemented a custom rule to remove empty LocalRelation children under Union and run the same query. The filter 'id = 'a' is inferred to the table2 and pushed via the Join.

scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(3) Project [id#4, val#5]
      +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
         +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [EqualTo(id,a), IsNotNull(id)], ReadSchema: struct<id:string,val:string>

scala>

Thanks and regards,
William


On Sat, Jun 15, 2019 at 12:13 AM William Wong <william1104@gmail.com> wrote:
Dear all, 

I created two tables. 

scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET");
19/06/14 23:49:10 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/06/14 23:49:11 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET");
res2: org.apache.spark.sql.DataFrame = []

It is the plan of joining these two column via ID column. It looks good to me as the filter 'id ='a'' is pushed to both tables as expected. 
scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id AND t1.id ='a'").explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
:- *(2) Project [id#23, val#24]
:  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
:     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(1) Project [id#68, val#69]
      +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
         +- *(1) FileScan parquet default.table2[id#68,val#69] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [EqualTo(id,a), IsNotNull(id)], ReadSchema: struct<id:string,val:string>

Somehow, we created a view on table1 by union a few partitions like this:
scala> spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)
res7: org.apache.spark.sql.DataFrame = []

In theory, selecting data via this view 'partitioned_table_1' should be the same as via the table 'table1'

This query also can push the filter 'id IN ('a','b','c','d') to table2 as expected. 
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
== Physical Plan ==
*(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id, [a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(2) Project [id#0, val#1]
:  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id, [a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(3) Project [id#0, val#1]
:  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id, [a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  +- *(4) Project [id#0, val#1]
:     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id, [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(5) Project [id#23, val#24]
      +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) && isnotnull(id#23))
         +- *(5) FileScan parquet default.table2[id#23,val#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [In(id, [a,b,c,d]), Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I..., ReadSchema: struct<id:string,val:string>

scala>

However, if we change the filter to 'id ='a', something strange happened. The filter 'id = 'a' cannot be pushed via table2...
scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
:  :- LocalTableScan <empty>, [id#0, val#1]
:  :- LocalTableScan <empty>, [id#0, val#1]
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   +- *(3) Project [id#23, val#24]
      +- *(3) Filter isnotnull(id#23)
         +- *(3) FileScan parquet default.table2[id#23,val#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:string,val:string>

Appreciate if anyone has an idea on it. Many thanks. 

Best regards,
William