flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Date Fri, 14 Jul 2017 13:55:33 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3829#discussion_r127456022
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
    @@ -488,19 +499,50 @@ abstract class TableEnvironment(val config: TableConfig) {
         *   tEnv.sql(s"SELECT * FROM $table")
         * }}}
         *
    -    * @param query The SQL query to evaluate.
    -    * @return The result of the query as Table.
    +    * @param sql The SQL string to evaluate.
    +    * @return The result of the query as Table or null of the DML insert operation.
         */
    -  def sql(query: String): Table = {
    +  def sql(sql: String): Table = {
         val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
         // parse the sql query
    -    val parsed = planner.parse(query)
    +    val parsed = planner.parse(sql)
         // validate the sql query
         val validated = planner.validate(parsed)
    -    // transform to a relational tree
    -    val relational = planner.rel(validated)
     
    -    new Table(this, LogicalRelNode(relational.rel))
    +    // separate source translation if DML insert, considering sink operation has no output
but all
    +    // RelNodes' translation should return a DataSet/DataStream, and Calcite's TableModify
    +    // defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column of BigInt type.
This a
    +    // work around since we have no conclusion for the output type definition of sink
operation
    +    // (compare to traditional SQL insert).
    +    if (parsed.isInstanceOf[SqlInsert]) {
    +      val insert = parsed.asInstanceOf[SqlInsert]
    +      // validate sink table
    +      val targetName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
    +      val targetTable = getTable(targetName)
    +      if (null == targetTable || !targetTable.isInstanceOf[TableSinkTable[_]]) {
    +        throw new TableException("SQL DML INSERT operation need a registered TableSink
Table!")
    +      }
    +      // validate unsupported partial insertion to sink table
    +      val sinkTable = targetTable.asInstanceOf[TableSinkTable[_]]
    +      if (null != insert.getTargetColumnList && insert.getTargetColumnList.size()
!=
    +        sinkTable.fieldTypes.length) {
    +        throw new TableException(
    +          "SQL DML INSERT do not support insert partial columns of the target table due
to table " +
    +            "columns haven’t nullable property definition for now!")
    +      }
    +
    +      writeToSink(
    +        new Table(this, LogicalRelNode(planner.rel(insert.getSource).rel)),
    +        sinkTable.tableSink,
    +        QueryConfig.getQueryConfigFromTableEnv(this)
    --- End diff --
    
    This will return the default configuration. If we move `INSERT INTO` queries into a special
method (like `sqlInsert()`), the method could have an additional `QueryConfig` parameter.
    
    Otherwise, users won't be able to tweak the execution of a streaming query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message