spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From William Wong <william1...@gmail.com>
Subject Re: Filter cannot be pushed via a Join
Date Tue, 18 Jun 2019 15:53:52 GMT
Dear all,

I am not sure if it is something expected or not, and should I report it as
a bug.  Basically, the constraints of a union table could be turned empty
if any subtable is turned into an empty local relation. The side effect is
filter cannot be inferred correctly (by InferFiltersFromConstrains)

We may reproduce the issue with the following setup:
1) Prepare two tables:
* spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING
PARQUET");
* spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING
PARQUET");

2) Create a union view on table1.
* spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)

3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot be
inferred. We can see that the constraints of the left table are empty.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter (isnotnull(id#0) && (id#0 = a))
:  :  +- Relation[id#0,val#1] parquet
:  :- LocalRelation <empty>, [id#0, val#1]
:  :- LocalRelation <empty>, [id#0, val#1]
:  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
:     +- Relation[id#0,val#1] parquet
+- Filter isnotnull(id#4)
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id =
'a'").queryExecution.optimizedPlan.children(0).constraints
res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()

4) Modified the query to avoid empty local relation. The filter 't2.id in
('a','b','c','d')' is then inferred properly. The constraints of the left
table are not empty as well.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").queryExecution.optimizedPlan
res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join Inner, (id#0 = id#4)
:- Union
:  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :  +- Relation[id#0,val#1] parquet
:  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0))
:     +- Relation[id#0,val#1] parquet
+- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4
= b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
   +- Relation[id#4,val#5] parquet

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN
('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
(id#0 = c)) || NOT id#0 IN (a,b,c)))


Thanks and regards,
William


On Sat, Jun 15, 2019 at 1:13 AM William Wong <william1104@gmail.com> wrote:

> Hi all,
>
> Appreciate any expert may help on this strange behavior..
>
> It is interesting that... I implemented a custom rule to remove empty
> LocalRelation children under Union and run the same query. The filter 'id =
> 'a' is inferred to the table2 and pushed via the Join.
>
> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
=
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#4, val#5]
>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
> ReadSchema: struct<id:string,val:string>
>
> scala>
>
> Thanks and regards,
> William
>
>
>
> On Sat, Jun 15, 2019 at 12:13 AM William Wong <william1104@gmail.com>
> wrote:
>
>> Dear all,
>>
>> I created two tables.
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>> string) USING PARQUET");
>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>> metastore. hive.metastore.schema.verification is not enabled so recording
>> the schema version 1.2.0
>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>> returning NoSuchObjectException
>> res1: org.apache.spark.sql.DataFrame = []
>>
>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>> string) USING PARQUET");
>> res2: org.apache.spark.sql.DataFrame = []
>>
>>
>> It is the plan of joining these two column via ID column. It looks good
>> to me as the filter 'id ='a'' is pushed to both tables as expected.
>>
>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
>> AND t1.id ='a'").explain
>> == Physical Plan ==
>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>> :- *(2) Project [id#23, val#24]
>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>> struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(1) Project [id#68, val#69]
>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Somehow, we created a view on table1 by union a few partitions like this:
>>
>> scala> spark.sql("""
>>      | CREATE VIEW partitioned_table_1 AS
>>      | SELECT * FROM table1 WHERE id = 'a'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'b'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id = 'c'
>>      | UNION ALL
>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>      | """.stripMargin)
>> res7: org.apache.spark.sql.DataFrame = []
>>
>>
>> In theory, selecting data via this view 'partitioned_table_1' should be
>> the same as via the table 'table1'
>>
>> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
>> expected.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>> == Physical Plan ==
>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(2) Project [id#0, val#1]
>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  :- *(3) Project [id#0, val#1]
>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN
>> (a,b,c,d))
>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>> :  +- *(4) Project [id#0, val#1]
>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(5) Project [id#23, val#24]
>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
>> isnotnull(id#23))
>>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>> *ReadSchema: struct<id:string,val:string>
>>
>> scala>
>>
>>
>> However, if we change the filter to 'id ='a', something strange happened.
>> The filter 'id = 'a' cannot be pushed via table2...
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct<id:string,val:string>
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  :- LocalTableScan <empty>, [id#0, val#1]
>> :  +- *(2) Project [id#0, val#1]
>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) &&
(id#0 =
>> a))
>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>> true]))
>>    +- *(3) Project [id#23, val#24]
>>       +- *(3) Filter isnotnull(id#23)
>>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>> struct<id:string,val:string>
>>
>>
>> Appreciate if anyone has an idea on it. Many thanks.
>>
>> Best regards,
>> William
>>
>

Mime
View raw message