flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-11020) Reorder joins only to eliminate cross joins
Date Mon, 03 Dec 2018 13:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-11020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707180#comment-16707180

Fabian Hueske commented on FLINK-11020:

[~hequn8128], the query that [~twalthr] used in the description can be executed without a
cross product. 
I think the question is not whether we should enable cross products (which is a valid question
as well!) but whether we should enable join reordering to prevent cross products.

I don't think that should enable join reordering in general because reordering without cardinality
estimates is gambling and we should keep the join order stable.
If we find a query that cannot be planned, we could try to optimize it with join ordering
enabled or we raise a meaningful exception explaining the issue to the user.

I'm not sure what the best approach is, but silently enabling join reordering (without stats)
is something that I'm not very comfortable with.

> Reorder joins only to eliminate cross joins 
> --------------------------------------------
>                 Key: FLINK-11020
>                 URL: https://issues.apache.org/jira/browse/FLINK-11020
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Priority: Major
> Currently, we don't reorder join and rely on the order provided by the user. This is
fine for most of the cases, however, it limits the set of supported SQL queries.
> Example:
> {code}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not supported
and thus leads to:
> {code}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for
the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> {code}
> Introducing {{JoinPushThroughJoinRule}} would help but should only be applied if a cross
join is the only alternative.

This message was sent by Atlassian JIRA

View raw message