flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "godfrey he (Jira)" <j...@apache.org>
Subject [jira] [Closed] (FLINK-20487) Support to consume retractions for window aggregate operator
Date Tue, 18 May 2021 11:40:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-20487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

godfrey he closed FLINK-20487.
------------------------------
    Resolution: Fixed

Fixed in 1.14.0: 1a1a72911ae2862fbfd634c2d707a68e5d296cc8

> Support to consume retractions for window aggregate operator
> ------------------------------------------------------------
>
>                 Key: FLINK-20487
>                 URL: https://issues.apache.org/jira/browse/FLINK-20487
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: jiayue.yu
>            Assignee: Andy
>            Priority: Major
>              Labels: auto-unassigned, pull-request-available
>             Fix For: 1.14.0
>
>
> {code}
> EXCEPTION: org.apache.flink.table.api.TableException: Group Window Aggregate: Retraction
on windowed GroupBy Aggregate is not supported yet. org.apache.flink.table.api.TableException:
Group Window Aggregate: Retraction on windowed GroupBy Aggregate is not supported yet. please
re-check sql grammar. Note: Windowed GroupBy Aggregate should not follow anon-windowed GroupBy
aggregation. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:138)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:91)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
> {code}
>  
> CASE:
> {code:sql}
> SELECT
>  DATE_FORMAT(tumble_end(ROWTIME ,interval '1' hour),'yyyy-MM-dd HH') as stat_time,
>  count(crypto_customer_number) first_phone_num
> FROM (
>  SELECT 
>  ROWTIME,
>  crypto_customer_number,
>  row_number() over(partition by crypto_customer_number order by ROWTIME ) as rn
>  FROM source_kafka_biz_shuidi_sdb_crm_call_record 
> ) cal 
> where rn =1
> group by tumble(ROWTIME,interval '1' hour);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message