[ https://issues.apache.org/jira/browse/FLINK12192?page=com.atlassian.jira.plugin.system.issuetabpanels:alltabpanel
]
godfrey he updated FLINK12192:

Description:
This issue aims to supports generating optimized logical plan for grouping sets and distinct
aggregate. (mentioned in FLINK12076 and FLINK12098)
for batch, query with distinct aggregate will be rewritten into two nondistinct aggregates
by extended [AggregateExpandDistinctAggregatesRulehttps://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
the first aggregate computes the distinct key and nondistinct aggregate function, and the
second aggregate computes the distinct aggregate function based on first aggregate result.
The first aggregate has grouping sets if there are more than one distinct aggregate functions
on different fields.
for stream, query with distinct aggregate is handled by SplitAggregateRule in FLINK12161.
query with grouping sets (or cube, rollup) will be rewritten into a regular aggregate with
expand, and the expand node will duplicates the input data for each simple group.
e.g.
{noformat}
schema:
MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
Original records:
+++++
 a  b  c  d 
+++++
 1  1  c1  d1 
+++++
 1  2  c1  d2 
+++++
 2  1  c1  d1 
+++++
SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
logical plan after expanded:
LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2],
$e=[2]}])
LogicalNativeTableScan(table=[[builtin, default, MyTable]])
notes:
'$e = 1' is equivalent to 'group by a'
'$e = 2' is equivalent to 'group by c'
expanded records:
+++++
 a  b  c  $e 
+++++ +
 1  1  null 1  
+++++ records expanded by record1
 null 1  c1  2  
+++++ +
 1  2  null 1  
+++++ records expanded by record2
 null 2  c1  2  
+++++ +
 2  1  null 1  
+++++ records expanded by record3
 null 1  c1  2  
+++++ +
{noformat}
was:
This issue aims to supports generating optimized logical plan for grouping sets and distinct
aggregate. (mentioned in [FLINK12076https://issues.apache.org/jira/browse/FLINK12076] and
[FLINK12098https://issues.apache.org/jira/browse/FLINK12098])
for batch, query with distinct aggregate will be rewritten into two nondistinct aggregates
by extended [AggregateExpandDistinctAggregatesRulehttps://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
the first aggregate computes the distinct result and nondistinct aggregate function result,
and the second aggregate computes the distinct aggregate function result based on first aggregate
result. The first aggregate has grouping sets if there are more than one distinct aggregate
on different fields.
for stream, query with distinct aggregate is handled by SplitAggregateRule in [FLINK12161https://issues.apache.org/jira/browse/FLINK12161].
query with grouping sets (or cube, rollup) will be rewritten into a regular aggregate with
expand.
The expand node will duplicates the input data for each simple group.
e.g.
{noformat}
schema:
MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
Original records:
+++++
 a  b  c  d 
+++++
 1  1  c1  d1 
+++++
 1  2  c1  d2 
+++++
 2  1  c1  d1 
+++++
SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
logical plan after expanded:
LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2],
$e=[2]}])
LogicalNativeTableScan(table=[[builtin, default, MyTable]])
notes:
'$e = 1' is equivalent to 'group by a'
'$e = 2' is equivalent to 'group by c'
expanded records:
+++++
 a  b  c  $e 
+++++ +
 1  1  null 1  
+++++ records expanded by record1
 null 1  c1  2  
+++++ +
 1  2  null 1  
+++++ records expanded by record2
 null 2  c1  2  
+++++ +
 2  1  null 1  
+++++ records expanded by record3
 null 1  c1  2  
+++++ +
{noformat}
> Add support for generating optimized logical plan for grouping sets and distinct aggregate
> 
>
> Key: FLINK12192
> URL: https://issues.apache.org/jira/browse/FLINK12192
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
>
> This issue aims to supports generating optimized logical plan for grouping sets and distinct
aggregate. (mentioned in FLINK12076 and FLINK12098)
> for batch, query with distinct aggregate will be rewritten into two nondistinct aggregates
by extended [AggregateExpandDistinctAggregatesRulehttps://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
the first aggregate computes the distinct key and nondistinct aggregate function, and the
second aggregate computes the distinct aggregate function based on first aggregate result.
The first aggregate has grouping sets if there are more than one distinct aggregate functions
on different fields.
> for stream, query with distinct aggregate is handled by SplitAggregateRule in FLINK12161.
> query with grouping sets (or cube, rollup) will be rewritten into a regular aggregate
with expand, and the expand node will duplicates the input data for each simple group.
> e.g.
> {noformat}
> schema:
> MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
> Original records:
> +++++
>  a  b  c  d 
> +++++
>  1  1  c1  d1 
> +++++
>  1  2  c1  d2 
> +++++
>  2  1  c1  d1 
> +++++
> SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
> logical plan after expanded:
> LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
> LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
> LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1],
c=[$2], $e=[2]}])
> LogicalNativeTableScan(table=[[builtin, default, MyTable]])
> notes:
> '$e = 1' is equivalent to 'group by a'
> '$e = 2' is equivalent to 'group by c'
> expanded records:
> +++++
>  a  b  c  $e 
> +++++ +
>  1  1  null 1  
> +++++ records expanded by record1
>  null 1  c1  2  
> +++++ +
>  1  2  null 1  
> +++++ records expanded by record2
>  null 2  c1  2  
> +++++ +
>  2  1  null 1  
> +++++ records expanded by record3
>  null 1  c1  2  
> +++++ +
> {noformat}

This message was sent by Atlassian JIRA
(v7.6.3#76005)
