flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2980) Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
Date Mon, 23 Jan 2017 14:13:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834598#comment-15834598
] 

ASF GitHub Bot commented on FLINK-2980:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3026#discussion_r97311604
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
---
    @@ -928,5 +1032,151 @@ class GroupWindowedTable(
         val fieldExprs = ExpressionParser.parseExpressionList(fields)
         select(fieldExprs: _*)
       }
    +}
    +
    +/**
    +  * A table that has been grouped on several sets of grouping keys.
    +  */
    +class GroupingSetsTable(
    +  private[flink] val table: Table,
    +  private[flink] val groups: Seq[Seq[Expression]],
    +  private[flink] val sqlKind: SqlKind) {
    +
    +  /**
    +    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   tab.groupingSets('key).select('key, 'value.avg + " The average" as 'average)
    +    * }}}
    +    */
    +  def select(fields: Expression*): Table = {
    +
    +    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
    +
    +    if (propNames.nonEmpty) {
    +      throw ValidationException("Window properties can only be used on windowed tables.")
    +    }
    +
    +    val groupingSets = sqlKind match {
    +      case SqlKind.CUBE => ExpressionUtils.cube(groups)
    +      case SqlKind.ROLLUP => ExpressionUtils.rollup(groups)
    +      case _ => groups
    +    }
    +
    +    val projectsOnAgg = replaceAggregationsAndProperties(
    +      fields, table.tableEnv, aggNames, propNames)
    +    val projectFields = extractFieldReferences(fields ++ groupingSets.flatten.distinct)
    +
    +    val logical =
    +      Project(projectsOnAgg,
    +        GroupingAggregation(groupingSets, aggNames.map(a => Alias(a._1, a._2)).toSeq,
    +                            Project(projectFields, table.logicalPlan).validate(table.tableEnv)
    +        ).validate(table.tableEnv)
    +      ).validate(table.tableEnv)
    +
    +    new Table(table.tableEnv, logical)
    +  }
    +
    +  /**
    +    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
    +    * }}}
    +    */
    +  def select(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    select(fieldExprs: _*)
    +  }
     
    +  /**
    +    * Groups the records of a table by assigning them to windows defined by a time or
row interval.
    +    *
    +    * For streaming tables of infinite size, grouping into windows is required to define
finite
    +    * groups on which group-based aggregates can be computed.
    +    *
    +    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
    +    * groupBy.
    +    *
    +    * @param groupWindow group-window that specifies how elements are grouped.
    +    * @return A windowed table.
    +    */
    +  def window(groupWindow: GroupWindow): GroupingSetsWindowedTable = {
    +    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw new ValidationException(s"Windows on batch tables are currently not supported.")
    +    }
    +    new GroupingSetsWindowedTable(table, groups, sqlKind, groupWindow)
    +  }
    +}
    +
    +class GroupingSetsWindowedTable(
    --- End diff --
    
    We decided to not support grouping sets in a stream environment yet and there also no
tests for it. Could you remove this class?


> Add CUBE/ROLLUP/GROUPING SETS operator in Table API.
> ----------------------------------------------------
>
>                 Key: FLINK-2980
>                 URL: https://issues.apache.org/jira/browse/FLINK-2980
>             Project: Flink
>          Issue Type: New Feature
>          Components: Documentation, Table API & SQL
>            Reporter: Chengxiang Li
>            Assignee: Alexander Chermenin
>         Attachments: Cube-Rollup-GroupSet design doc in Flink.pdf
>
>
> Computing aggregates over a cube/rollup/grouping sets of several dimensions is a common
operation in data warehousing. It would be nice to have them in Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message