flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7571) Execution of TableSources with Time Indicators fails
Date Tue, 19 Sep 2017 08:59:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171351#comment-16171351
] 

ASF GitHub Bot commented on FLINK-7571:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139633811
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
---
    @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    That's how rowtime fields are handled at the moment. They are always appended at the end
of the row.
    
    There is a JIRA to use existing fields as rowtime fields (https://issues.apache.org/jira/browse/FLINK-7446)
but this has not been implemented yet. I'm currently working on a PR for that.


> Execution of TableSources with Time Indicators fails
> ----------------------------------------------------
>
>                 Key: FLINK-7571
>                 URL: https://issues.apache.org/jira/browse/FLINK-7571
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Critical
>
> The translation of queries that include a TableSource with time indicators fails during
the code generation because field names and field indicies are not adjusted for the time indicators.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message