flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wenlong Lyu (Jira)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-22454) Failed to translate Lookup Join when join on a CAST expression on dimention table column
Date Thu, 20 May 2021 06:00:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-22454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348075#comment-17348075
] 

Wenlong Lyu commented on FLINK-22454:
-------------------------------------

Hi, [~fsk119], I think is error is by designed. Lookup join must have look up keys in join
condition, we ignored casting on field ref while extracting lookup keys(see CommonPhysicalLookupJoin#getIdenticalSourceField),
in order to support lookup keys which have interoperable types(see PlannerTypeUtils#isInteroperable),
and added  a check on the type of lookup keys to make sure that the types is really interoperable.


In your case, the cast can not be ignored(int and decimal are not interoperable), so planner
throws error about type not compatible on lookup key.

> Failed to translate Lookup Join when join on a CAST expression on dimention table column
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-22454
>                 URL: https://issues.apache.org/jira/browse/FLINK-22454
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Shengkai Fang
>            Priority: Major
>
> Please add test in {{LookupJoinTest}}
> {code:java}
>  def before(): Unit ={
>     util.addDataStream[(Int, String, Long)](
>       "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
>     if (legacyTableSource) {
>       TestTemporalTable.createTemporaryTable(util.tableEnv, "LookupTable")
>     } else {
>       util.addTable(
>         """
>           |CREATE TABLE LookupTable (
>           |  `id` DECIMAL(38, 10),
>           |  `to_qty` DECIMAL(38, 10),
>           |  `name` STRING,
>           |  `age` INT,
>           |  `id_int` as CAST(`id` AS INT)
>           |) WITH (
>           |  'connector' = 'values'
>           |)
>           |""".stripMargin)
>   }
> {code}
> {code:java}
> @Test
>   def test(): Unit = {
>     val sql =
>     """
>     |SELECT MyTable.b, LookupTable.`to_qty`
>     |FROM MyTable
>     |LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF MyTable.`proctime`
>     |ON MyTable.a = CAST(LookupTable.`id` as INT)
>     |""".stripMargin
>     util.tableEnv.sqlQuery(sql).explain()
>   }
> {code}
> The exception stack is 
> {code}
> org.apache.flink.table.api.TableException: Temporal table join requires equivalent condition
of the same type, but the condition is a[INT]=id[DECIMAL(38, 10)]
> 	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validateLookupKeyType(CommonExecLookupJoin.java:303)
> 	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:222)
> 	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> 	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
> 	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
> 	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
> 	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
> 	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
> 	at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
> 	at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
> 	at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
> 	at org.apache.flink.table.planner.plan.stream.sql.join.LookupJoinTest.test(LookupJoinTest.scala:197)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> 	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> 	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.junit.runners.Suite.runChild(Suite.java:128)
> 	at org.junit.runners.Suite.runChild(Suite.java:27)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> 	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
> 	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
> 	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message