flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hequn Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join
Date Tue, 11 Dec 2018 02:30:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716015#comment-16716015
] 

Hequn Cheng commented on FLINK-11070:
-------------------------------------

[~fhueske] Thanks for your quick reply! 
Ok, I got your point. It also makes sense to notify users about the potential performance
risk. Once we support join reordering, we can remove the switches.

Best, Hequn

> Add stream-stream non-window cross join
> ---------------------------------------
>
>                 Key: FLINK-11070
>                 URL: https://issues.apache.org/jira/browse/FLINK-11070
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. This is
fine for most of the cases, however, it limits the set of supported SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not supported
and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for
the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> {code}
> In order to support more queries, it would be nice to have cross join on streaming. We
can start from a simple version, for example, call forceNonParallel() for connectOperator
in `DataStreamJoin` when it is a cross join. The performance may be bad. But it works fine
if the two tables of cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message