flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Date Fri, 14 Jul 2017 13:55:33 GMT
Github user fhueske commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
    @@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: TableConfig) {
         *   tEnv.sql(s"SELECT * FROM $table")
         * }}}
    -    * @param query The SQL query to evaluate.
    -    * @return The result of the query as Table.
    +    * @param sql The SQL string to evaluate.
    +    * @return The result of the query as Table or null of the DML insert operation.
    -  def sql(query: String): Table = {
    +  def sql(sql: String): Table = {
         val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
         // parse the sql query
    -    val parsed = planner.parse(query)
    +    val parsed = planner.parse(sql)
         // validate the sql query
         val validated = planner.validate(parsed)
    -    // transform to a relational tree
    -    val relational = planner.rel(validated)
    -    new Table(this, LogicalRelNode(relational.rel))
    +    // separate source translation if DML insert, considering sink operation has no output
but all
    +    // RelNodes' translation should return a DataSet/DataStream, and Calcite's TableModify
    +    // defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column of BigInt type.
This a
    +    // work around since we have no conclusion for the output type definition of sink
    +    // (compare to traditional SQL insert).
    +    if (parsed.isInstanceOf[SqlInsert]) {
    +      val insert = parsed.asInstanceOf[SqlInsert]
    +      // validate sink table
    +      val targetName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
    +      val targetTable = getTable(targetName)
    +      if (null == targetTable || !targetTable.isInstanceOf[TableSinkTable[_]]) {
    +        throw new TableException("SQL DML INSERT operation need a registered TableSink
    +      }
    +      // validate unsupported partial insertion to sink table
    +      val sinkTable = targetTable.asInstanceOf[TableSinkTable[_]]
    +      if (null != insert.getTargetColumnList && insert.getTargetColumnList.size()
    +        sinkTable.fieldTypes.length) {
    --- End diff --
    I think there is a mismatch between the `TableSink` interface and how it is used as an
output table here. An instance of `TableSink` has no predefined columns but gets those when
it is configured (`TableSink.configure()`) before it emits a Table. 
    In the current state, a `TableSink` is configured to emit data according to the query
result, i.e, the schema of the emitted table depends on the query result. However, for `INSERT
INTO` the table schema is defined beforehand and Calcite validates that the schema of the
query is compliant with the schema of the table to write to. 
    I think we should change the `TableSink` interface, but right now I don't think we should
use `TableSink` like that.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message