flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hequn Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-11070) Add stream-stream non-window cross join
Date Tue, 11 Dec 2018 01:07:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715884#comment-16715884
] 

Hequn Cheng edited comment on FLINK-11070 at 12/11/18 1:06 AM:
---------------------------------------------------------------

[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. And table1
is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1+1*1M(cross join) VS 1*1M+1M*1(non-cross
join). This means the query may be executed with a cross join with a better performance.

Adding a switch is a good idea. This force our users to pay more attention to the performance
of cross join. But it may also bring some inconvenience. We can't remove the switch even Flink
support join reordering. Because there is a chance the cardinality estimates have not been
passed by the user. So once we can't get the cardinality estimates, the user has to configure
the switch to enable a cross join if he does want to use cross join. From this point of view,
I think we should not have the switch. 
 I would propose that:
 * Don't enable join reordering in general because reordering without cardinality estimates
is gambling
 * Trust the query written by the user as we don't have cardinality estimates. And we don't
need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I will take
an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn


was (Author: hequn8128):
[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. And table1
is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 1*1M*1M(non-cross
join). This means the query may be executed with a cross join with a much better performance.

Adding a switch is a good idea. This force our users to pay more attention to the performance
of cross join. But it may also bring some inconvenience. We can't remove the switch even Flink
support join reordering. Because there is a chance the cardinality estimates have not been
passed by the user. So once we can't get the cardinality estimates, the user has to configure
the switch to enable a cross join if he does want to use cross join. From this point of view,
I think we should not have the switch. 
I would propose that:
 * Don't enable join reordering in general because reordering without cardinality estimates
is gambling
 * Trust the query written by the user as we don't have cardinality estimates. And we don't
need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I will take
an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn

> Add stream-stream non-window cross join
> ---------------------------------------
>
>                 Key: FLINK-11070
>                 URL: https://issues.apache.org/jira/browse/FLINK-11070
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. This is
fine for most of the cases, however, it limits the set of supported SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not supported
and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for
the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> {code}
> In order to support more queries, it would be nice to have cross join on streaming. We
can start from a simple version, for example, call forceNonParallel() for connectOperator
in `DataStreamJoin` when it is a cross join. The performance may be bad. But it works fine
if the two tables of cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message