flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mark You (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (FLINK-6862) Tumble window rowtime not resolve at logic plan validation
Date Wed, 07 Jun 2017 14:01:18 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Mark You closed FLINK-6862.
---------------------------
    Resolution: Duplicate

> Tumble window rowtime not resolve at logic plan validation
> ----------------------------------------------------------
>
>                 Key: FLINK-6862
>                 URL: https://issues.apache.org/jira/browse/FLINK-6862
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Mark You
>
> Following code sample work in version 1.2.1, but failed at 1.3.0
> {code:title= TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
>     public static void main(String[] args) throws Exception {
>         List<Content> data = new ArrayList<Content>();
>         data.add(new Content(1L, "Hi"));
>         data.add(new Content(2L, "Hallo"));
>         data.add(new Content(3L, "Hello"));
>         data.add(new Content(4L, "Hello"));
>         data.add(new Content(7L, "Hello"));
>         data.add(new Content(8L, "Hello world"));
>         data.add(new Content(16L, "Hello world"));
>         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
>         DataStream<Content> stream = env.fromCollection(data);
>         DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
>                 new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1))
{
>                     /**
>                      * 
>                      */
>                     private static final long serialVersionUID = 410512296011057717L;
>                     @Override
>                     public long extractTimestamp(Content element) {
>                         return element.getRecordTime();
>                     }
>                 });
>         Table table = tableEnv.fromDataStream(stream2);
>         table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w").select("w.start,
content.count");
>         env.execute();
>     }
>     public static class Content implements Serializable {
>         private long recordTime;
>         private String content;
>         public Content() {
>             super();
>         }
>         public Content(long recordTime, String content) {
>             super();
>             this.recordTime = recordTime;
>             this.content = content;
>         }
>         public long getRecordTime() {
>             return recordTime;
>         }
>         public void setRecordTime(long recordTime) {
>             this.recordTime = recordTime;
>         }
>         public String getContent() {
>             return content;
>         }
>         public void setContent(String content) {
>             this.content = content;
>         }
>     }
>     private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]>
{
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         @Override
>         public long extractTimestamp(Object[] element, long previousElementTimestamp)
{
>             // TODO Auto-generated method stub
>             return (long) element[0];
>         }
>         @Override
>         public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp)
{
>             return new Watermark(extractedTimestamp);
>         }
>     }
> }
> {code}
> Exception trace:
> {noformat}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve
[rowtime] given input [content, recordTime].
> 	at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:86)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> 	at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:132)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:131)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> 	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> 	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> 	at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> 	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> 	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> 	at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> 	at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
> 	at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
> 	at org.apache.flink.table.plan.logical.Project.validate(operators.scala:67)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1054)
> 	at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1073)
> 	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:54)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message