flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kent Murra (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
Date Wed, 20 Sep 2017 22:48:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173959#comment-16173959
] 

Kent Murra commented on FLINK-7657:
-----------------------------------

I've noticed that the scope of the problem is a bit larger than what I found.  As an example,
integer values are BigDecimal, and when running the test cases I'll see Literals that have
a BigDecimal value and a BasicTypeInfo.INT type.  It tends to work since the underlying engine
considers Integers and BigDecimal comparable, however, my concern is that it does not match
the documentation.  I can see people implementing FilterableTableSource being surprised by
this.  At the same time, fixing the issue might break people who depend on the old behavior.

I think for testing purposes, the ideal thing is to have the literal also encode the type
so that we can do type verification as well in the test cases.  However the scope of such
a change is very large, and I'm not sure the maintainers would want to review such a patch.
 I can limit the scope of my change but my concern is that it won't be a *good* fix and more
of a duct-tape thing.

Can I get some guidance on the approach I should take?  

Let me know if there are any questions surrounding this.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7657
>                 URL: https://issues.apache.org/jira/browse/FLINK-7657
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Kent Murra
>            Assignee: Kent Murra
>            Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When the execution
environment tries to optimize the SQL, it causes an exception (attached below).  The result
is any SQL query with a timestamp, date, or time literal is unexecutable if any table source
is marked with FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule PushFilterIntoTableSourceScanRule,
args [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
$t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], fields:(data, last_updated))]
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> 	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> 	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
> 	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
> 	at org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
> 	at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
> 	at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
> 	at com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
> 	at com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> 	at scala.App$class.main(App.scala:76)
> 	at com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
> 	at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be cast to
java.util.Date
> 	at org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
> 	at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
> 	at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
> 	at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.immutable.List.map(List.scala:285)
> 	at org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
> 	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
> 	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
> 	at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
> 	... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem with the
way a Calcite AST is translated into an Expression tree for the predicates. Calcite parses
timestamps as Calendar values, and you'll note in [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
that a Calendar value is being passed as-is to the [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
which does no conversion of the value. The Literal, in turn, [expects the value to be a java.sql.Date
subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to java.sql.Date/java.sql.Time/java.sql.Timestamp
in RegNodeToExpressionConverter and had good results. Here is some reproduction code in Scala.
I am using Flink version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ).

> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
>   val tables: BatchTableEnvironment = TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
>   val source = new TestTableSource
>   val sink = new PrintTableSink()
>   tables.registerTableSource("test_table", source)
>   tables.sql("SELECT * FROM test_table WHERE last_updated > DATE '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
>   def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
>   def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
>   protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends BatchTableSource[Row]
with FilterableTableSource[Row] {
>   val getReturnType: RowTypeInfo = {
>     val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, SqlTimeTypeInfo.DATE)
>     val fieldNames = Array("data", "last_updated")
>     new RowTypeInfo(typeInfo, fieldNames)
>   }
>   def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = new TestTableSource(true)
>   def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
>     execEnv.fromCollection({
>       val data = ListBuffer[Row]()
>       data += row("Success!", Date.valueOf("2017-09-01"))
>       data += row("Failure!", Date.valueOf("2017-01-01"))
>       data
>     })
>   }
>   def row(data: String, lastUpdated: Date): Row = {
>     val row = new Row(2)
>     row.setField(0, data)
>     row.setField(1, lastUpdated)
>     row
>   }
> }
> {code}
>  Build system is SBT 
> {code:none} 
> name := "kmurra-flink-reproduction"
> organization := "kmurra" 
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
>   "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % "provided",
>   "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % "provided",
>   "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % "provided",
>   "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % "provided",
>   "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % "provided"
> )
> assemblyMergeStrategy in assembly := {
>   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
>   case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala =
false)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message