flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5827) Exception when do filter after join a udtf which returns a POJO type
Date Thu, 02 Mar 2017 15:22:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892406#comment-15892406
] 

ASF GitHub Bot commented on FLINK-5827:
---------------------------------------

Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3357
  
    Thanks @kaibozhou. I will merge this.


> Exception when do filter after join a udtf which returns a POJO type
> --------------------------------------------------------------------
>
>                 Key: FLINK-5827
>                 URL: https://issues.apache.org/jira/browse/FLINK-5827
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> The test case:
> {code:title=testFilterUdtfOutputPojo} 
> @Test
>   def testFilterUdtfOutputPojo(): Unit = {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val pojoFunc1 = new PojoTableFunc()
>     tEnv.registerFunction("pojo1", pojoFunc1)
>     val result = CollectionDataSets.getSmall3TupleDataSet(env)
>       .toTable(tEnv, 'a, 'b, 'c)
>       .join(pojoFunc1('c))
>       .where(('age > 0) && ('name !== ""))
>       .select('a, 'b, 'c, 'age, 'name)
>     val results = result.toDataSet[Row].collect()
>   }
> {code}
> It will throw exception:
> {code}
> org.apache.flink.table.codegen.CodeGenException: No input mapping is specified for input1
of type POJO.
>   at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
>   at org.apache.flink.table.codegen.CodeGenerator$$anonfun$1.apply(CodeGenerator.scala:80)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.flink.table.codegen.CodeGenerator.<init>(CodeGenerator.scala:79)
>   at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:191)
>   at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.generateCollector(DataSetCorrelate.scala:37)
>   at org.apache.flink.table.plan.nodes.CommonCorrelate$class.correlateMapFunction(CommonCorrelate.scala:70)
>   at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.correlateMapFunction(DataSetCorrelate.scala:37)
>   at org.apache.flink.table.plan.nodes.dataset.DataSetCorrelate.translateToPlan(DataSetCorrelate.scala:101)
>   at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:277)
>   at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:256)
>   at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:140)
>   at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:40)
>   at org.apache.flink.table.api.scala.stream.table.UserDefinedTableFunctionTest.testFilterUdtfOutputPojo(UserDefinedTableFunctionTest.scala:399)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message