flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
Date Mon, 17 Jul 2017 10:26:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089603#comment-16089603
] 

ASF GitHub Bot commented on FLINK-6442:
---------------------------------------

Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3829
  
    Thanks for the reply @lincoln-lil.
    
    I agree, it is possible that a `TableSink` defines its schema when it is created instead
of when it is configured by the optimizer. However, I think mixing these modes obfuscates
the interface and means that the way how a `TableSink` can be used (with registration or without)
depends on its implementation. IMO, a (correctly implemented) `TableSink` should be usable
at any place where it can be passed to the Table API. From my point of view mixing both modes
in the same interface would be bad API design. Actually, I think the registration mode of
defining the schema before is very valuable because it ensures that the output of the query
has a correct schema.
    
    Regarding the method of how to specify the query, I'm very much in favor of providing
a separate method. I don't care so much about the name, `sqlInsert()` is just a proposal.
However, I think the usage of a  `SELECT` query is completely different from an `INSERT INTO
SELECT` query. This is reflected by the different return type and the need to provide a `QueryConfig`.
    
    Best, Fabian


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-6442
>                 URL: https://issues.apache.org/jira/browse/FLINK-6442
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  when we
use SQL writing a streaming job, we should add additional part for the sink, like TableAPI
does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 'schema'
is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target output could
be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
>     tEnv.registerTableSink("targetTable", new YourSink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>     val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message