flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join
Date Fri, 07 Dec 2018 07:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712454#comment-16712454

Fabian Hueske commented on FLINK-11070:

Thanks for this issue [~hequn8128]!

I think we definitely have to support cross joins at some point. However, I think we should
not do that (by default) before we added support for join reordering. 
The example that you gave in the description, can be execute with two equi joins and should
not be executed with a cross join. However, the planner will not find the efficient plan without
join reordering. If we enable cross joins by default, the plan would be simply translated
and executed with a (non-parallel) cross join. A cross join should (typically) only be used,
if a query cannot be executed in any other way.

I would propose that until we have support for proper join reordering, we 
1. We give a better error message if a query can only be executed with a cross join and explain
that users should try to reorder the table references or enable cross joins.
2. Add a switch to support cross joins. I would make this join parallel and always broadcast
the second input table (since we do not reorder the joins, this will be deterministic).
3. Possibly add a switch to enable join reordering with a BIG warning sign, that the order
can be very suboptimal.

Once we support join reordering (with reasonable cardinality estimates), we can remove the
switches and have a good behavior without any configuration.

What do you think?

> Add stream-stream non-window cross join
> ---------------------------------------
>                 Key: FLINK-11070
>                 URL: https://issues.apache.org/jira/browse/FLINK-11070
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
> Currently, we don't reorder join and rely on the order provided by the user. This is
fine for most of the cases, however, it limits the set of supported SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not supported
and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for
the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> {code}
> In order to support more queries, it would be nice to have cross join on streaming. We
can start from a simple version, for example, call forceNonParallel() for connectOperator
in `DataStreamJoin` when it is a cross join. The performance may be bad. But it works fine
if the two tables of cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, etc.
> Any suggestions are greatly appreciated.

This message was sent by Atlassian JIRA

View raw message