Github user uybhatti commented on a diff in the pull request: https://github.com/apache/flink/pull/4635#discussion_r139661930 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala --- @@ -28,48 +29,113 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo class StreamTableSourceTable[T]( override val tableSource: TableSource[T], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable[T](tableSource, statistic) { - + extends TableSourceTable[T]( + tableSource, + StreamTableSourceTable.adjustFieldIndexes(tableSource), + StreamTableSourceTable.adjustFieldNames(tableSource), + statistic) { override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource) + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType( + this.fieldNames, + fieldTypes) + } - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList +} - val fields = fieldNames.zip(fieldTypes) +object StreamTableSourceTable { + + private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = { + val (rowtime, proctime) = getTimeIndicators(tableSource) + + val original = TableEnvironment.getFieldIndices(tableSource) + + // append rowtime marker + val withRowtime = if (rowtime.isDefined) { + original :+ TimeIndicatorTypeInfo.ROWTIME_MARKER --- End diff -- Thanks, Actually we can define existing field as the rowtime field, when we convert DataStream to Table. That's why I was little bit confused, but we have PR for TableSource then it's good. ---