flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "godfrey he (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-12575) Introduce planner rules to remove redundant shuffle and collation
Date Tue, 21 May 2019 12:22:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-12575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

godfrey he updated FLINK-12575:
-------------------------------
    Description: 
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}}
when some operators require its inputs to satisfy distribution trait or collation trait in
planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}}
or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}}
could provide distribution and collation on its join keys. If the provided traits could satisfy
the required traits, the {{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2
on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  :     +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
:     +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the Exchanges between SortMergeJoins are redundant due to their shuffle
keys are same, the Sorts in the top tow SortMergeJoins' left hand side are redundant due to
its input is sorted.

another situation is the shuffle and collation could be removed between multiple {{Over}}
s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
    COUNT(*) OVER (PARTITION BY c ORDER BY a),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    RANK() OVER (PARTITION BY c ORDER BY a, c),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], window#1=[RANK(*)
...], ...)
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a), $SUM0(a)
...], ...)
               +- Sort(orderBy=[b ASC, a ASC])
                  +- Exchange(distribution=[hash[b]])
                     +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}} and {{Sort}} between the top two OverAggregates are redundant due to their
shuffle keys and sort keys are same.

  was:
{{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}}
when some operators require its inputs to satisfy distribution trait or collation trait in
planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}}
or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}}
could provide distribution and collation on its join keys. If the provided traits could satisfy
the required traits, the {{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2
on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
:- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  :     +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
:     +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the {{Exchange}} s between {{SortMergeJoin}} s are redundant due to
their shuffle keys are same, the {{Sort}}s in the top tow {{SortMergeJoin}} s' left hand side
are redundant due to its input is sorted.

another situation is the shuffle and collation could be removed between multiple {{Over}}
s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
    COUNT(*) OVER (PARTITION BY c ORDER BY a),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    RANK() OVER (PARTITION BY c ORDER BY a, c),
    SUM(a) OVER (PARTITION BY b ORDER BY a),
    COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], window#1=[RANK(*)
...], ...)
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a), $SUM0(a)
...], ...)
               +- Sort(orderBy=[b ASC, a ASC])
                  +- Exchange(distribution=[hash[b]])
                     +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}} s and {{Sort}} between the top two {{OverAggregate}} s are redundant due
to their shuffle keys and sort keys are same.


> Introduce planner rules to remove redundant shuffle and collation
> -----------------------------------------------------------------
>
>                 Key: FLINK-12575
>                 URL: https://issues.apache.org/jira/browse/FLINK-12575
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>
> {{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}}
when some operators require its inputs to satisfy distribution trait or collation trait in
planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}}
or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}}
could provide distribution and collation on its join keys. If the provided traits could satisfy
the required traits, the {{Exchange}} or the {{Sort}} is redundant.
> e.g. 
> {code:sql}
> schema:
> x: a int, b bigint, c varchar
> y: d int, e bigint, f varchar
> t1: a1 int, b1 bigint, c1 varchar
> t2: d1 int, e1 bigint, f1 varchar
> sql:
> select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join
t2 on a1 = d1 and b1 = e1
> the physical plan after redundant Exchange and Sort are removed:
> SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...)
> :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true],
...)
> :  :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...)
> :  :  :- Exchange(distribution=[hash[a, b]])
> :  :  :  +- TableSourceScan(table=[[x]], ...)
> :  :  +- Exchange(distribution=[hash[d, e]])
> :  :     +- TableSourceScan(table=[[y]], ...)
> :  +- Exchange(distribution=[hash[a1, b1]])
> :     +- TableSourceScan(table=[[t1]], ...)
> +- Exchange(distribution=[hash[d1, e1]])
>    +- TableSourceScan(table=[[t2]], ...)
> {code}
> In above physical plan, the Exchanges between SortMergeJoins are redundant due to their
shuffle keys are same, the Sorts in the top tow SortMergeJoins' left hand side are redundant
due to its input is sorted.
> another situation is the shuffle and collation could be removed between multiple {{Over}}
s. e.g.
> {code:sql}
> schema:
> MyTable: a int, b int, c varchar
> sql:
> SELECT
>     COUNT(*) OVER (PARTITION BY c ORDER BY a),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     RANK() OVER (PARTITION BY c ORDER BY a, c),
>     SUM(a) OVER (PARTITION BY b ORDER BY a),
>     COUNT(*) OVER (PARTITION BY c ORDER BY c)
>  FROM MyTable
> the physical plan after redundant Exchange and Sort are removed:
> Calc(select=[...])
> +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*)  ...])
>    +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], window#1=[RANK(*)
...], ...)
>       +- Sort(orderBy=[c ASC, a ASC])
>          +- Exchange(distribution=[hash[c]])
>             +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a), $SUM0(a)
...], ...)
>                +- Sort(orderBy=[b ASC, a ASC])
>                   +- Exchange(distribution=[hash[b]])
>                      +- TableSourceScan(table=[[MyTable]], ...)
> {code}
> the {{Exchange}} and {{Sort}} between the top two OverAggregates are redundant due to
their shuffle keys and sort keys are same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message