flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hequn Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-11070) Add stream-stream non-window cross join
Date Tue, 04 Dec 2018 12:32:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Hequn Cheng updated FLINK-11070:
--------------------------------
    Description: 
Currently, we don't reorder join and rely on the order provided by the user. This is fine
for most of the cases, however, it limits the set of supported SQL queries.
Example:
{code:java}
val streamUtil: StreamTableTestUtil = streamTestUtil()
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
val sqlQuery =
      """
        |SELECT t1.a, t3.b
        |FROM MyTable3 t3, MyTable2 t2, MyTable t1
        |WHERE t1.a = t3.a AND t1.a = t2.a
        |""".stripMargin

streamUtil.printSql(sqlQuery)
{code}
Given the current rule sets, this query produces a cross join which is not supported and thus
leads to:
{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the
given query: 

LogicalProject(a=[$8], b=[$1])
  LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
    LogicalJoin(condition=[true], joinType=[inner])
      LogicalJoin(condition=[true], joinType=[inner])
        LogicalTableScan(table=[[_DataStreamTable_2]])
        LogicalTableScan(table=[[_DataStreamTable_1]])
      LogicalTableScan(table=[[_DataStreamTable_0]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
{code}

In order to support more queries, it would be nice to have cross join on streaming. We can
start from a simple version, for example, call forceNonParallel() for connectOperator in `DataStreamJoin`
when it is a cross join. The performance may be bad. But it works fine if the two tables of
cross join are small ones. 
We can do some optimizations later, such as broadcasting the smaller side, etc.

Any suggestions are greatly appreciated.

  was:
Currently, we don't reorder join and rely on the order provided by the user. This is fine
for most of the cases, however, it limits the set of supported SQL queries.
Example:
{code:java}
val streamUtil: StreamTableTestUtil = streamTestUtil()
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
val sqlQuery =
      """
        |SELECT t1.a, t3.b
        |FROM MyTable3 t3, MyTable2 t2, MyTable t1
        |WHERE t1.a = t3.a AND t1.a = t2.a
        |""".stripMargin

streamUtil.printSql(sqlQuery)
{code}
Given the current rule sets, this query produces a cross join which is not supported and thus
leads to:
{code:java}
org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the
given query: 

LogicalProject(a=[$8], b=[$1])
  LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
    LogicalJoin(condition=[true], joinType=[inner])
      LogicalJoin(condition=[true], joinType=[inner])
        LogicalTableScan(table=[[_DataStreamTable_2]])
        LogicalTableScan(table=[[_DataStreamTable_1]])
      LogicalTableScan(table=[[_DataStreamTable_0]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
{code}

In order to support more queries, it would be nice to have cross join on streaming. We can
start from a simple version, for example, call forceNonParallel() for connectOperator in `DataStreamJoin`
when it is a cross join. The performance may be bad. But it works fine if the two tables of
cross join are small ones. Table t2 and t3 is much smaller than t1. In this case, supporting
a cross join would be a good choice. 

Optimizations can be done later, such as broadcasting the smaller side, etc.

Any suggestions are greatly appreciated.


> Add stream-stream non-window cross join
> ---------------------------------------
>
>                 Key: FLINK-11070
>                 URL: https://issues.apache.org/jira/browse/FLINK-11070
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. This is
fine for most of the cases, however, it limits the set of supported SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not supported
and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for
the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL features.
> {code}
> In order to support more queries, it would be nice to have cross join on streaming. We
can start from a simple version, for example, call forceNonParallel() for connectOperator
in `DataStreamJoin` when it is a cross join. The performance may be bad. But it works fine
if the two tables of cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message