spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From William Wong <william1...@gmail.com>
Subject Re: Filter cannot be pushed via a Join
Date Wed, 19 Jun 2019 04:13:02 GMT
Hi Xiao,

Just report this with JIRA SPARK-28103.

https://issues.apache.org/jira/browse/SPARK-28103

Thanks and Regards,
William

On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li <gatorsmile@gmail.com> wrote:

> Hi, William,
>
> Thanks for reporting it. Could you open a JIRA?
>
> Cheers,
>
> Xiao
>
> William Wong <william1104@gmail.com> 于2019年6月18日周二 上午8:57写道:
>
>> BTW, I noticed a workaround is creating a custom rule to remove 'empty
>> local relation' from a union table. However, I am not 100% sure if it is
>> the right approach.
>>
>> On Tue, Jun 18, 2019 at 11:53 PM William Wong <william1104@gmail.com>
>> wrote:
>>
>>> Dear all,
>>>
>>> I am not sure if it is something expected or not, and should I report it
>>> as a bug.  Basically, the constraints of a union table could be turned
>>> empty if any subtable is turned into an empty local relation. The side
>>> effect is filter cannot be inferred correctly (by
>>> InferFiltersFromConstrains)
>>>
>>> We may reproduce the issue with the following setup:
>>> 1) Prepare two tables:
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>>> USING PARQUET");
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>>> USING PARQUET");
>>>
>>> 2) Create a union view on table1.
>>> * spark.sql("""
>>>      | CREATE VIEW partitioned_table_1 AS
>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>      | UNION ALL
>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>      | """.stripMargin)
>>>
>>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>>> be inferred. We can see that the constraints of the left table are empty.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  :- LocalRelation <empty>, [id#0, val#1]
>>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
= a))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter isnotnull(id#4)
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id =
>>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>>
>>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>>> left table are not empty as well.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan
>>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> :     +- Relation[id#0,val#1] parquet
>>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4
= a) ||
>>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>>    +- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) ||
>>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>> On Sat, Jun 15, 2019 at 1:13 AM William Wong <william1104@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Appreciate any expert may help on this strange behavior..
>>>>
>>>> It is interesting that... I implemented a custom rule to remove empty
>>>> LocalRelation children under Union and run the same query. The filter 'id
=
>>>> 'a' is inferred to the table2 and pushed via the Join.
>>>>
>>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>> == Physical Plan ==
>>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>>> :- Union
>>>> :  :- *(1) Project [id#0, val#1]
>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>> ReadSchema: struct<id:string,val:string>
>>>> :  +- *(2) Project [id#0, val#1]
>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) &&
(id#0
>>>> = a))
>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>> true]))
>>>>    +- *(3) Project [id#4, val#5]
>>>>       +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
>>>>          +- *(3) FileScan parquet default.table2[id#4,val#5] Batched:
>>>> true, Format: Parquet, Location:
>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>> PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
>>>> ReadSchema: struct<id:string,val:string>
>>>>
>>>> scala>
>>>>
>>>> Thanks and regards,
>>>> William
>>>>
>>>>
>>>>
>>>> On Sat, Jun 15, 2019 at 12:13 AM William Wong <william1104@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear all,
>>>>>
>>>>> I created two tables.
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val
>>>>> string) USING PARQUET");
>>>>> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
>>>>> metastore. hive.metastore.schema.verification is not enabled so recording
>>>>> the schema version 1.2.0
>>>>> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
>>>>> returning NoSuchObjectException
>>>>> res1: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val
>>>>> string) USING PARQUET");
>>>>> res2: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> It is the plan of joining these two column via ID column. It looks
>>>>> good to me as the filter 'id ='a'' is pushed to both tables as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id =
>>>>> t2.id AND t1.id ='a'").explain
>>>>> == Physical Plan ==
>>>>> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
>>>>> :- *(2) Project [id#23, val#24]
>>>>> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
>>>>> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(1) Project [id#68, val#69]
>>>>>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>>>>>          +- *(1) FileScan parquet default.table2[id#68,val#69]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters:
>>>>> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Somehow, we created a view on table1 by union a few partitions like
>>>>> this:
>>>>>
>>>>> scala> spark.sql("""
>>>>>      | CREATE VIEW partitioned_table_1 AS
>>>>>      | SELECT * FROM table1 WHERE id = 'a'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'b'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id = 'c'
>>>>>      | UNION ALL
>>>>>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>>>      | """.stripMargin)
>>>>> res7: org.apache.spark.sql.DataFrame = []
>>>>>
>>>>>
>>>>> In theory, selecting data via this view 'partitioned_table_1' should
>>>>> be the same as via the table 'table1'
>>>>>
>>>>> This query also can push the filter 'id IN ('a','b','c','d') to table2
>>>>> as expected.
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
>>>>> == Physical Plan ==
>>>>> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) &&
id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(2) Project [id#0, val#1]
>>>>> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) &&
id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  :- *(3) Project [id#0, val#1]
>>>>> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) &&
id#0 IN
>>>>> (a,b,c,d))
>>>>> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
>>>>> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
>>>>> :  +- *(4) Project [id#0, val#1]
>>>>> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d))
&&
>>>>> isnotnull(id#0))
>>>>> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
>>>>> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(5) Project [id#23, val#24]
>>>>>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23)
&&
>>>>> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c)))
&&
>>>>> isnotnull(id#23))
>>>>>          +- *(5) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
>>>>> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
>>>>> *ReadSchema: struct<id:string,val:string>
>>>>>
>>>>> scala>
>>>>>
>>>>>
>>>>> However, if we change the filter to 'id ='a', something strange
>>>>> happened. The filter 'id = 'a' cannot be pushed via table2...
>>>>>
>>>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
>>>>> WHERE t1.id = t2.id AND t1.id = 'a'").explain
>>>>> == Physical Plan ==
>>>>> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
>>>>> :- Union
>>>>> :  :- *(1) Project [id#0, val#1]
>>>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>>>> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>>>> ReadSchema: struct<id:string,val:string>
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  :- LocalTableScan <empty>, [id#0, val#1]
>>>>> :  +- *(2) Project [id#0, val#1]
>>>>> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c))
&&
>>>>> (id#0 = a))
>>>>> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>>>>> true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>>>>> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
>>>>> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
>>>>> true]))
>>>>>    +- *(3) Project [id#23, val#24]
>>>>>       +- *(3) Filter isnotnull(id#23)
>>>>>          +- *(3) FileScan parquet default.table2[id#23,val#24]
>>>>> Batched: true, Format: Parquet, Location:
>>>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
>>>>> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
>>>>> struct<id:string,val:string>
>>>>>
>>>>>
>>>>> Appreciate if anyone has an idea on it. Many thanks.
>>>>>
>>>>> Best regards,
>>>>> William
>>>>>
>>>>

Mime
View raw message