flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uybhatti <...@git.apache.org>
Subject [GitHub] flink pull request #4635: [FLINK-7571] [table] Fix translation of TableSourc...
Date Tue, 19 Sep 2017 11:16:22 GMT
Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4635#discussion_r139661930
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
---
    @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
     class StreamTableSourceTable[T](
         override val tableSource: TableSource[T],
         override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
    -  extends TableSourceTable[T](tableSource, statistic) {
    -
    +  extends TableSourceTable[T](
    +    tableSource,
    +    StreamTableSourceTable.adjustFieldIndexes(tableSource),
    +    StreamTableSourceTable.adjustFieldNames(tableSource),
    +    statistic) {
     
       override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
    +    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
    +
         val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
    +    flinkTypeFactory.buildLogicalRowType(
    +      this.fieldNames,
    +      fieldTypes)
    +  }
     
    -    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
    -    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
    +}
     
    -    val fields = fieldNames.zip(fieldTypes)
    +object StreamTableSourceTable {
    +
    +  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
    +    val (rowtime, proctime) = getTimeIndicators(tableSource)
    +
    +    val original = TableEnvironment.getFieldIndices(tableSource)
    +
    +    // append rowtime marker
    +    val withRowtime = if (rowtime.isDefined) {
    +      original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER
    --- End diff --
    
    Thanks, Actually we can define existing field as the rowtime field, when we convert DataStream
to Table. That's why I was little bit confused, but we have PR for TableSource then it's good.


---

Mime
View raw message