flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piotr Nowojski (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-10734) Temporal joins on heavily filtered tables might fail in planning
Date Wed, 31 Oct 2018 11:42:00 GMT
Piotr Nowojski created FLINK-10734:
--------------------------------------

             Summary: Temporal joins on heavily filtered tables might fail in planning
                 Key: FLINK-10734
                 URL: https://issues.apache.org/jira/browse/FLINK-10734
             Project: Flink
          Issue Type: Bug
          Components: Table API &amp; SQL
    Affects Versions: 1.7.0
            Reporter: Piotr Nowojski


Following query:
{code}
    val sqlQuery =
      """
        |SELECT
        |  o.amount * r.rate AS amount
        |FROM
        |  Orders AS o,
        |  LATERAL TABLE (Rates(o.rowtime)) AS r
        |WHERE r.currency = o.currency
        |""".stripMargin
{code}
with {{Rates}} defined as follows:
{code}
    tEnv.registerTable("EuroRatesHistory", tEnv.scan("RatesHistory").filter('currency ===
"Euro"))
    tEnv.registerFunction(
      "Rates",
      tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 'currency))
{code}
Will fail with:
{noformat}
org.apache.flink.table.api.ValidationException: Only single column join key is supported.
Found [] in [InnerJoin(where: (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join:
(amount, rowtime, currency, rate, rowtime0))]

 at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
 at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
 at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)

{noformat}
The problem is that filtering condition {{('currency === "Euro")}} interferes with joining
condition, simplifying it to nothing. Note how top {{LogicalFilter(condition=[=($3, $1)])}}
changes during optimising and finally disappears:
{noformat}
LogicalProject(amount=[*($0, $4)])
  LogicalFilter(condition=[=($3, $1)])
    LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner])
      LogicalTableScan(table=[[_DataStreamTable_0]])
      LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
        LogicalTableScan(table=[[_DataStreamTable_1]])
{noformat}
{noformat}
LogicalProject(amount=[*($0, $4)])
  LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
    LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3)
NOT NULL])
      LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner])
        LogicalTableScan(table=[[_DataStreamTable_0]])
        LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
          LogicalTableScan(table=[[_DataStreamTable_1]])
{noformat}
{noformat}
FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
  FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, $2)], joinType=[inner])
    FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t3, $t1)],
amount=[$t0], rowtime=[$t2], $condition=[$t4])
      FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
    FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t0, $t3)],
proj#0..2=[{exprs}], $condition=[$t4])
      FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message