flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-6473) Add OVER window support for batch tables
Date Tue, 24 Jul 2018 12:17:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Fabian Hueske updated FLINK-6473:
---------------------------------
    Description: 
Add support for OVER windows for batch tables. 

Since OVER windows are supported for streaming tables, this issue is not about the API (which
is available) but about adding the execution strategies and translation for OVER windows on
batch tables.

The feature could be implemented using the following plans

*UNBOUNDED OVER*

{code}
DataSet[Row] input = ...
DataSet[Row] result = input
  .groupBy(partitionKeys)
  .sortGroup(orderByKeys)
  .reduceGroup(computeAggregates)
{code}

This implementation is quite straightforward because we don't need to retract rows.

*BOUNDED OVER*

A bit more challenging are BOUNDED OVER windows, because we need to retract values from aggregates
and we don't want to store rows temporarily on the heap.

{code}
DataSet[Row] input = ...
DataSet[Row] sorted = input
  .partitionByHash(partitionKey)
  .sortPartition(partitionKeys, orderByKeys)
DataSet[Row] result = sorted.coGroup(sorted)
  .where(partitionKey).equalTo(partitionKey)
  .with(computeAggregates)
{code}

With this, the data set should be partitioned and sorted once. The sorted {{DataSet}} would
be consumed twice (the optimizer should inject a temp barrier on one of the inputs to avoid
a consumption deadlock). The {{CoGroupFunction}} would accumulate new rows into the aggregates
from one input and retract them from the other. Since both input streams are properly sorted,
this can happen in a zigzag fashion. We need verify that the generated plan is was we want
it to be.

  was:
Add support for OVER windows for batch tables. 

Since OVER windows are supported for streaming tables, this issue is not about the API (which
is available) but about adding the execution strategies and translation for OVER windows on
batch tables.


> Add OVER window support for batch tables
> ----------------------------------------
>
>                 Key: FLINK-6473
>                 URL: https://issues.apache.org/jira/browse/FLINK-6473
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Fabian Hueske
>            Priority: Major
>
> Add support for OVER windows for batch tables. 
> Since OVER windows are supported for streaming tables, this issue is not about the API
(which is available) but about adding the execution strategies and translation for OVER windows
on batch tables.
> The feature could be implemented using the following plans
> *UNBOUNDED OVER*
> {code}
> DataSet[Row] input = ...
> DataSet[Row] result = input
>   .groupBy(partitionKeys)
>   .sortGroup(orderByKeys)
>   .reduceGroup(computeAggregates)
> {code}
> This implementation is quite straightforward because we don't need to retract rows.
> *BOUNDED OVER*
> A bit more challenging are BOUNDED OVER windows, because we need to retract values from
aggregates and we don't want to store rows temporarily on the heap.
> {code}
> DataSet[Row] input = ...
> DataSet[Row] sorted = input
>   .partitionByHash(partitionKey)
>   .sortPartition(partitionKeys, orderByKeys)
> DataSet[Row] result = sorted.coGroup(sorted)
>   .where(partitionKey).equalTo(partitionKey)
>   .with(computeAggregates)
> {code}
> With this, the data set should be partitioned and sorted once. The sorted {{DataSet}}
would be consumed twice (the optimizer should inject a temp barrier on one of the inputs to
avoid a consumption deadlock). The {{CoGroupFunction}} would accumulate new rows into the
aggregates from one input and retract them from the other. Since both input streams are properly
sorted, this can happen in a zigzag fashion. We need verify that the generated plan is was
we want it to be.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message