[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538426#comment-16538426
]
ASF GitHub Bot commented on FLINK-8558:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6264#discussion_r201307506
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
---
@@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
}
}
-// TODO expose this once we have enough table source factories that can deal with it
-// /**
-// * Creates a table from a descriptor that describes the source connector, source
encoding,
-// * the resulting table schema, and other properties.
-// *
-// * @param connectorDescriptor connector descriptor describing the source of the
table
-// */
-// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor =
{
-// new BatchTableSourceDescriptor(this, connectorDescriptor)
-// }
+ /**
+ * Creates a table from a descriptor that describes the source connector, the source
format,
+ * the resulting table schema, and other properties.
+ *
+ * Descriptors allow for declaring communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for connectors and matching
connectors
+ * are configured accordingly.
+ *
+ * The following example shows how to read from a Kafka connector using a JSON format
and
+ * creating a table:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .from(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("zookeeper.connect", "localhost")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .toTable()
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the source of the table
+ */
+ def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
--- End diff --
Once we reimplement the environments in Java and introduce proper interfaces for hiding
the implementation. This problem will be gone anyway.
> Add unified format interfaces and format discovery
> --------------------------------------------------
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently only {{flink-avro}}
is located there but we will add more formats such as {{flink-json}}, {{flink-protobuf}},
and so on. For better separation of concerns we want decouple connectors from formats: e.g.,
remove {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to discovery available
formats in the classpath (similar to how file systems are discovered now). A {{Format}} will
provide a method for converting {{byte[]}} to target record type.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
|