flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5735) Non-overlapping sliding window is not deterministic
Date Thu, 12 Jul 2018 12:55:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541586#comment-16541586
] 

Timo Walther commented on FLINK-5735:
-------------------------------------

Thanks for looking into this [~Valerii_Florov]. I gave you contributor permissions. You can
know assign issues to yourself. 

> Non-overlapping sliding window is not deterministic
> ---------------------------------------------------
>
>                 Key: FLINK-5735
>                 URL: https://issues.apache.org/jira/browse/FLINK-5735
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Priority: Major
>
> I don't know if this is a problem of the Table API or the underlying API. We have to
investigate this as part of the issue.
> The following code leads to different results from time to time. Sometimes the count
of "Hello" is 1 sometimes 2.
> {code}
>   val data = List(
>     (1L, 1, "Hi"),
>     (2L, 2, "Hallo"),
>     (3L, 2, "Hello"),
>     (6L, 3, "Hello"),
>     (4L, 5, "Hello"),
>     (16L, 4, "Hello world"),
>     (8L, 3, "Hello world"))
>   @Test
>   def testEventTimeSlidingWindowNonOverlapping(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     StreamITCase.testResults = mutable.MutableList()
>     val stream = env
>       .fromCollection(data)
>       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>     val table = stream.toTable(tEnv, 'long, 'int, 'string)
>     val windowedTable = table
>       .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
>       .groupBy('w, 'string)
>       .select('string, 'int.count, 'w.start, 'w.end)
>     val results = windowedTable.toDataStream[Row]
>     results.addSink(new StreamITCase.StringSink)
>     env.execute()
>     val expected = Seq(
>       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
>       "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
>     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
>   }
>   class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int,
String)] {
>     override def checkAndGetNextWatermark(
>         lastElement: (Long, Int, String),
>         extractedTimestamp: Long)
>       : Watermark = {
>       new Watermark(extractedTimestamp)
>     }
>     override def extractTimestamp(
>         element: (Long, Int, String),
>         previousElementTimestamp: Long): Long = {
>       element._1
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message