flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8158) Rowtime window inner join emits late data
Date Wed, 29 Nov 2017 04:28:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270116#comment-16270116
] 

ASF GitHub Bot commented on FLINK-8158:
---------------------------------------

Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/5094
  
    Hi @hequn8128, let me try to explain this.
    
    1. In current implementation,  the join process just relies on the cached rows instead
of the watermarks. Specifically, when receiving a record, the join function will only check
whether there exist qualified rows of the opposite cache in spite of the lateness. Thus if
the cache ***is not cleaned up in time***, the outdated results will be emitted.
    
    2. Strictly speaking, the value for holding back watermarks should be dynamically reported
by the join function in runtime. The current implementation temporarily uses a static value
(`MaxOutputDelay`) for that. In other words, the holding back value should be decided by the
cached rows, rather than, the cache size should be decided by `MaxOutputDelay`.
    
    Hope that helps.
    
    Best, Xingcan


> Rowtime window inner join emits late data
> -----------------------------------------
>
>                 Key: FLINK-8158
>                 URL: https://issues.apache.org/jira/browse/FLINK-8158
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>         Attachments: screenshot-1xxx.png
>
>
> When executing the join, the join operator needs to make sure that no late data is emitted.
Currently, this achieved by holding back watermarks. However, the window border is not handled
correctly. For the sql bellow: 
> {quote}
>     val sqlQuery =
>       """
>         SELECT t2.key, t2.id, t1.id
>         FROM T1 as t1 join T2 as t2 ON
>           t1.key = t2.key AND
>           t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
>             t2.rt + INTERVAL '1' SECOND
>         """.stripMargin
>     val data1 = new mutable.MutableList[(String, String, Long)]
>     // for boundary test
>     data1.+=(("A", "LEFT1", 6000L))
>     val data2 = new mutable.MutableList[(String, String, Long)]
>     data2.+=(("A", "RIGHT1", 6000L))
> {quote}
> Join will output a watermark with timestamp 1000, but if left comes with another data
("A", "LEFT1", 1000L), join will output a record with timestamp 1000 which equals previous
watermark.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message