flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
Date Tue, 25 Jul 2017 07:46:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099653#comment-16099653

ASF GitHub Bot commented on FLINK-6442:

Github user lincoln-lil commented on the issue:

    @fhueske Agree with you. I've done most of the work, but after I deprecated `tableEnv.sql`
method and modify all the relevant test cases, 
    I found a problem when validate input `sql String` in the `sqlSelect` method , we need
to enumerate all kinds of `SqlNode` except `SqlDelete / SqlInsert` and `dml Sql Operations`,
because it's not a `SqlSelect` for such a query `select * from A union select * from B`.
    Did we provide too many types of sql method ?
    And I had a look at the `Jdbc` api, it provided the following sql execution methods( see
    - `Boolean execute ()` Executes the SQL statement in this PreparedStatement object, which
may be any kind of SQL statement.
    - `ResultSet executeQuery ()` Executes the SQL query in this PreparedStatement object
and returns the ResultSet object generated by the query.
    - `int executeUpdate ()` Executes the SQL statement in this PreparedStatement object,
which must be an SQL Data Manipulation Language (DML) statement, such as INSERT, UPDATE or
DELETE; or an SQL statement that returns nothing, such as a DDL statement.
    Since `Jdbc` is well known to most Java developers, I think it's a good reference to our
api design. 
    I propose to provide the corresponding  three methods:
    - `sql(sql: String): Option[Table]` In this PR keeps current `sql(sql: String): Table`
method deprecated, and will open another jira to implement the new sql method to support any
kinds of sql String(even a mixed string contains several types of sql statements) which will
return the value of Option[Table] or other value (discuss later).
    - `sqlQuery(sql: String): Table` 
    - `sqlUpdate(sql: String): Option[Long]` considering returns the affected row count in
the future which is more meaningful than 'Unit' 
    changing the existing test case from the use of `sql` method to the `sqlQuery` method,
update the document and will explain the relationship with Jdbc methods. 
    What do you think?
    Best, Lincoln

> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
> -------------------------------------------------------------------------------
>                 Key: FLINK-6442
>                 URL: https://issues.apache.org/jira/browse/FLINK-6442
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Minor
> Currently in TableAPI  there’s only registration method for source table,  when we
use SQL writing a streaming job, we should add additional part for the sink, like TableAPI
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 'schema'
is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target output could
be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
>     tEnv.registerTableSink("targetTable", new YourSink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>     val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: https://goo.gl/n3phK5

This message was sent by Atlassian JIRA

View raw message