flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
Date Wed, 06 Sep 2017 10:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155127#comment-16155127
] 

ASF GitHub Bot commented on FLINK-6233:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r137225205
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
---
    @@ -55,8 +55,10 @@ class DataStreamWindowJoinRule
     
         if (windowBounds.isDefined) {
           if (windowBounds.get.isEventTime) {
    -        // we cannot handle event-time window joins yet
    -        false
    +        val procTimeAttrInOutput = join.getRowType.getFieldList.asScala
    +          .exists(f => FlinkTypeFactory.isProctimeIndicatorType(f.getType))
    +
    +        !remainingPredsAccessTime && !procTimeAttrInOutput
    --- End diff --
    
    No, the problem is that these rowtime attributes after a proc-time join won't be aligned
with the watermarks anymore. We would need to hold back watermarks based on the data in the
caches and not based on the window boundaries. 
    
    Keeping the proctime attributes is fine because they are not bound to watermarks.


> Support rowtime inner equi-join between two streams in the SQL API
> ------------------------------------------------------------------
>
>                 Key: FLINK-6233
>                 URL: https://issues.apache.org/jira/browse/FLINK-6233
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: hongyuhong
>            Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time streams to
the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}}
only can use rowtime that is a system attribute, the time condition only support bounded time
range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1'
HOUR}}, not support unbounded like {{o.rowtime &lt; s.rowtime}} ,  and  should include
both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should
also not be supported.
> An row-time streams join will not be able to handle late data, because this would mean
in insert a row into a sorted order shift all other computations. This would be too expensive
to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join
with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message