flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
Date Tue, 10 Jul 2018 11:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538426#comment-16538426
] 

ASF GitHub Bot commented on FLINK-8558:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6264#discussion_r201307506
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
---
    @@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
         }
       }
     
    -// TODO expose this once we have enough table source factories that can deal with it
    -//  /**
    -//    * Creates a table from a descriptor that describes the source connector, source
encoding,
    -//    * the resulting table schema, and other properties.
    -//    *
    -//    * @param connectorDescriptor connector descriptor describing the source of the
table
    -//    */
    -//  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor =
{
    -//    new BatchTableSourceDescriptor(this, connectorDescriptor)
    -//  }
    +  /**
    +    * Creates a table from a descriptor that describes the source connector, the source
format,
    +    * the resulting table schema, and other properties.
    +    *
    +    * Descriptors allow for declaring communication to external systems in an
    +    * implementation-agnostic way. The classpath is scanned for connectors and matching
connectors
    +    * are configured accordingly.
    +    *
    +    * The following example shows how to read from a Kafka connector using a JSON format
and
    +    * creating a table:
    +    *
    +    * {{{
    +    *
    +    * tableEnv
    +    *   .from(
    +    *     new Kafka()
    +    *       .version("0.11")
    +    *       .topic("clicks")
    +    *       .property("zookeeper.connect", "localhost")
    +    *       .property("group.id", "click-group")
    +    *       .startFromEarliest())
    +    *   .withFormat(
    +    *     new Json()
    +    *       .jsonSchema("{...}")
    +    *       .failOnMissingField(false))
    +    *   .withSchema(
    +    *     new Schema()
    +    *       .field("user-name", "VARCHAR").from("u_name")
    +    *       .field("count", "DECIMAL")
    +    *       .field("proc-time", "TIMESTAMP").proctime())
    +    *   .toTable()
    +    * }}}
    +    *
    +    * @param connectorDescriptor connector descriptor describing the source of the table
    +    */
    +  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
    --- End diff --
    
    Once we reimplement the environments in Java and introduce proper interfaces for hiding
the implementation. This problem will be gone anyway.


> Add unified format interfaces and format discovery
> --------------------------------------------------
>
>                 Key: FLINK-8558
>                 URL: https://issues.apache.org/jira/browse/FLINK-8558
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently only {{flink-avro}}
is located there but we will add more formats such as {{flink-json}}, {{flink-protobuf}},
and so on. For better separation of concerns we want decouple connectors from formats: e.g.,
remove {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to discovery available
formats in the classpath (similar to how file systems are discovered now). A {{Format}} will
provide a method for converting {{byte[]}} to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message