flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] hequn8128 commented on a change in pull request #7920: [FLINK-11844][table-api] Simplify over window API classes and improve documentation
Date Sun, 10 Mar 2019 10:24:19 GMT
hequn8128 commented on a change in pull request #7920: [FLINK-11844][table-api] Simplify over
window API classes and improve documentation
URL: https://github.com/apache/flink/pull/7920#discussion_r264032047
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
 ##########
 @@ -20,90 +20,180 @@ package org.apache.flink.table.api
 
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.table.api.scala.{CURRENT_RANGE, CURRENT_ROW}
 
 /**
-  * Over window is similar to the traditional OVER SQL.
+  * A over-window specification.
+  *
+  * Similar to SQL, over window aggregates compute an aggregate for each input row over a
range
+  * of its neighboring rows.
+  */
+class OverWindow(
+    alias: Expression,
+    partitionBy: Seq[Expression],
+    orderBy: Expression,
+    preceding: Expression,
+    following: Option[Expression]) {
+
+  def getAlias: Expression = alias
+
+  def getPartitioning: Seq[Expression] = partitionBy
+
+  def getOrder: Expression = orderBy
+
+  def getPreceding: Expression = preceding
+
+  def getFollowing: Option[Expression] = following
+}
+
+// ------------------------------------------------------------------------------------------------
+// Over windows
+// ------------------------------------------------------------------------------------------------
+
+/**
+  * Partially defined over window with partitioning.
+  */
+class OverWindowPartitioned(partitionBy: Seq[Expression]) {
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderByExpr field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderByExpr: String): OverWindowPartitionedOrdered = {
+    orderBy(ExpressionParser.parseExpression(orderByExpr))
+  }
+
+  /**
+    * Specifies the time attribute on which rows are ordered.
+    *
+    * For streaming tables, reference a rowtime or proctime time attribute here
+    * to specify the time mode.
+    *
+    * For batch tables, refer to a timestamp or long attribute.
+    *
+    * @param orderBy field reference
+    * @return an over window with defined order
+    */
+  def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
+    new OverWindowPartitionedOrdered(partitionBy, orderBy)
+  }
+}
+
+/**
+  * Partially defined over window with (optional) partitioning and order.
   */
-case class OverWindow(
-    private[flink] val alias: Expression,
-    private[flink] val partitionBy: Seq[Expression],
-    private[flink] val orderBy: Expression,
-    private[flink] val preceding: Expression,
-    private[flink] val following: Expression)
+class OverWindowPartitionedOrdered(partitionBy: Seq[Expression], orderBy: Expression) {
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over window.
+    *
+    * @param precedingExpr preceding offset relative to the current row.
+    * @return an over window with defined preceding
+    */
+  def preceding(precedingExpr: String): OverWindowPartitionedOrderedPreceding = {
+    preceding(ExpressionParser.parseExpression(precedingExpr))
+  }
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return an over window with defined preceding
+    */
+  def preceding(preceding: Expression): OverWindowPartitionedOrderedPreceding = {
+    new OverWindowPartitionedOrderedPreceding(partitionBy, orderBy, preceding)
+  }
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to.
+    *
+    * @param alias alias for this over window
+    * @return the fully defined over window
+    */
+  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause can refer to.
+    *
+    * @param alias alias for this over window
+    * @return the fully defined over window
+    */
+  def as(alias: Expression): OverWindow = {
+    new OverWindow(alias, partitionBy, orderBy, UnboundedRange(), None)
+  }
+}
 
 /**
-  * A partially defined over window.
+  * Partially defined over window with (optional) partitioning, order, and preceding.
   */
-class OverWindowWithPreceding(
+class OverWindowPartitionedOrderedPreceding(
     private val partitionBy: Seq[Expression],
     private val orderBy: Expression,
     private val preceding: Expression) {
 
-  private[flink] var following: Expression = _
+  private var optionalFollowing: Option[Expression] = None
 
   /**
     * Assigns an alias for this window that the following `select()` clause can refer to.
     *
     * @param alias alias for this over window
-    * @return over window
+    * @return the fully defined over window
     */
   def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
 
   /**
     * Assigns an alias for this window that the following `select()` clause can refer to.
     *
     * @param alias alias for this over window
-    * @return over window
+    * @return the fully defined over window
     */
   def as(alias: Expression): OverWindow = {
-
-    // set following to CURRENT_ROW / CURRENT_RANGE if not defined
-    if (null == following) {
-      if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
-        following = CURRENT_ROW
-      } else {
-        following = CURRENT_RANGE
-      }
-    }
-    OverWindow(alias, partitionBy, orderBy, preceding, following)
+    new OverWindow(alias, partitionBy, orderBy, preceding, optionalFollowing)
   }
 
   /**
     * Set the following offset (based on time or row-count intervals) for over window.
     *
-    * @param following following offset that relative to the current row.
-    * @return this over window
+    * @param followingExpr following offset that relative to the current row.
+    * @return an over window with defined following
     */
-  def following(following: String): OverWindowWithPreceding = {
-    this.following(ExpressionParser.parseExpression(following))
+  def following(followingExpr: String): OverWindowPartitionedOrderedPreceding = {
 
 Review comment:
   Should we rename these xxxExpr to xxx? It seems strange to add the "Expr" suffix for the
String parameters. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message