flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] wuchong commented on a change in pull request #8462: [FLINK-12496][table-planner-blink] Support translation from StreamExecGroupWindowAggregate to StreamTransformation.
Date Fri, 17 May 2019 15:24:16 GMT
wuchong commented on a change in pull request #8462: [FLINK-12496][table-planner-blink] Support
translation from StreamExecGroupWindowAggregate to StreamTransformation.
URL: https://github.com/apache/flink/pull/8462#discussion_r285169940
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
 ##########
 @@ -122,7 +135,236 @@ class StreamExecGroupWindowAggregate(
 
   override protected def translateToPlanInternal(
       tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val config = tableEnv.getConfig
+
+    val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+      .asInstanceOf[StreamTransformation[BaseRow]]
+
+    val inputRowTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+    val outRowType = FlinkTypeFactory.toInternalRowType(outputRowType).toTypeInfo
+
+    val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+    if (inputIsAccRetract) {
+      throw new TableException(
+        "Group Window Agg: Retraction on windowed GroupBy aggregation is not supported yet.
\n" +
+          "please re-check sql grammar. \n" +
+          "Note: Windowed GroupBy aggregation should not follow a" +
+          "non-windowed GroupBy aggregation.")
+    }
+
+    val isCountWindow = window match {
+      case TumblingGroupWindow(_, _, size) if isRowIntervalType(size.getType) => true
+      case SlidingGroupWindow(_, _, size, _) if isRowIntervalType(size.getType) => true
+      case _ => false
+    }
+
+    if (isCountWindow && grouping.length > 0 && config.getMinIdleStateRetentionTime
< 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates state. " +
+          "Please provide a query configuration with valid retention interval to prevent
" +
+          "excessive state size. You may specify a retention time of 0 to not clean up the
state.")
+    }
+
+    // validation
+    emitStrategy.checkValidation()
+
+    val aggString = RelExplainUtil.streamWindowAggregationToString(
+      inputRowType,
+      grouping,
+      outputRowType,
+      aggCalls,
+      namedProperties)
+
+    val timeIdx = if (isRowtimeIndicatorType(window.timeAttribute.getResultType)) {
+      if (inputTimestampIndex < 0) {
+        throw new TableException(
+          "Group Window Agg: Time attribute could not be found. \n" +
+          "Time attribute could not be found. This is a bug.\n" +
+            "please contact customer support for this"
 
 Review comment:
   Please polish the exception message. For example: 
   
   "Group window aggregate must defined on a time attribute, but the time attribute can't
be found. This should never happen. Please file an issue."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message