flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments to interfaces
Date Wed, 03 Apr 2019 15:48:46 GMT
hequn8128 commented on a change in pull request #8050: [FLINK-11067][table] Convert TableEnvironments
to interfaces
URL: https://github.com/apache/flink/pull/8050#discussion_r271810319
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##########
 @@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+/**
+ * The base class for batch and stream TableEnvironments.
+ *
+ * <p>The TableEnvironment is a central concept of the Table API and SQL integration.
It is
+ * responsible for:
+ *
+ * <ul>
+ *     <li>Registering a Table in the internal catalog</li>
+ *     <li>Registering an external catalog</li>
+ *     <li>Executing SQL queries</li>
+ *     <li>Registering a user-defined (scalar, table, or aggregation) function</li>
+ *     <li>Converting a DataStream or DataSet into a Table</li>
+ *     <li>Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment</li>
+ * </ul>
+ */
+@PublicEvolving
+public interface TableEnvironment {
+
+	/**
+	 * Creates a table from a table source.
+	 *
+	 * @param source table source used as table
+	 */
+	Table fromTableSource(TableSource<?> source);
+
+	/**
+	 * Registers an {@link ExternalCatalog} under a unique name in the TableEnvironment's schema.
+	 * All tables registered in the {@link ExternalCatalog} can be accessed.
+	 *
+	 * @param name            The name under which the externalCatalog will be registered
+	 * @param externalCatalog The externalCatalog to register
+	 */
+	void registerExternalCatalog(String name, ExternalCatalog externalCatalog);
+
+	/**
+	 * Gets a registered {@link ExternalCatalog} by name.
+	 *
+	 * @param name The name to look up the {@link ExternalCatalog}
+	 * @return The {@link ExternalCatalog}
+	 */
+	ExternalCatalog getRegisteredExternalCatalog(String name);
+
+	/**
+	 * Registers a {@link ScalarFunction} under a unique name. Replaces already existing
+	 * user-defined functions under this name.
+	 */
+	void registerFunction(String name, ScalarFunction function);
+
+	/**
+	 * Registers a {@link Table} under a unique name in the TableEnvironment's catalog.
+	 * Registered tables can be referenced in SQL queries.
+	 *
+	 * @param name The name under which the table will be registered.
+	 * @param table The table to register.
+	 */
+	void registerTable(String name, Table table);
+
+	/**
+	 * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
+	 * Registered tables can be referenced in SQL queries.
+	 *
+	 * @param name        The name under which the {@link TableSource} is registered.
+	 * @param tableSource The {@link TableSource} to register.
+	 */
+	void registerTableSource(String name, TableSource<?> tableSource);
+
+	/**
+	 * Registers an external {@link TableSink} with given field names and types in this
+	 * {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param fieldNames The field names to register with the {@link TableSink}.
+	 * @param fieldTypes The field types to register with the {@link TableSink}.
+	 * @param tableSink The {@link TableSink} to register.
+	 */
+	void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes,
TableSink<?> tableSink);
+
+	/**
+	 * Registers an external {@link TableSink} with already configured field names and field
types in
+	 * this {@link TableEnvironment}'s catalog.
+	 * Registered sink tables can be referenced in SQL DML statements.
+	 *
+	 * @param name The name under which the {@link TableSink} is registered.
+	 * @param configuredSink The configured {@link TableSink} to register.
+	 */
+	void registerTableSink(String name, TableSink<?> configuredSink);
+
+	/**
+	 * Scans a registered table and returns the resulting {@link Table}.
+	 *
+	 * <p>A table to scan must be registered in the TableEnvironment. It can be either
directly
+	 * registered as DataStream, DataSet, or Table or as member of an {@link ExternalCatalog}.
+	 *
+	 * <p>Examples:
+	 *
+	 * <p>Scanning a directly registered table.
+	 * <pre>
+	 * {@code
+	 *   Table tab = tableEnv.scan("tableName");
+	 * }
+	 * </pre>
+	 *
+	 * <p>Scanning a table from a registered catalog.
+	 * <pre>
+	 * {@code
+	 *   Table tab = tableEnv.scan("catalogName", "dbName", "tableName");
+	 * }
+	 * </pre>
+	 *
+	 * @param tablePath The path of the table to scan.
+	 * @throws TableException if no table is found using the given table path.
+	 * @return The resulting {@link Table}.
+	 */
+	Table scan(String... tablePath) throws TableException;
+
+	/**
+	 * Creates a table source and/or table sink from a descriptor.
+	 *
+	 * <p>Descriptors allow for declaring the communication to external systems in an
+	 * implementation-agnostic way. The classpath is scanned for suitable table factories that
match
+	 * the desired configuration.
+	 *
+	 * <p>The following example shows how to read from a connector using a JSON format
and
+	 * registering a table source as "MyTable":
+	 *
+	 * <pre>
+	 * {@code
+	 *
+	 * tableEnv
+	 *   .connect(
+	 *     new ExternalSystemXYZ()
+	 *       .version("0.11"))
+	 *   .withFormat(
+	 *     new Json()
+	 *       .jsonSchema("{...}")
+	 *       .failOnMissingField(false))
+	 *   .withSchema(
+	 *     new Schema()
+	 *       .field("user-name", "VARCHAR").from("u_name")
+	 *       .field("count", "DECIMAL")
+	 *   .registerSource("MyTable");
+	 * }
+	 *</pre>
+	 *
+	 * @param connectorDescriptor connector descriptor describing the external system
+	 */
+	TableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+
+	/**
+	 * Gets the names of all tables registered in this environment.
+	 *
+	 * @return A list of the names of all registered tables.
+	 */
+	String[] listTables();
 
 Review comment:
   I think it only returns directly registered tables. Listing tables in Catalog can use CatalogAPIs.
Does it make sense to you? Maybe we should make the comment more accurate. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message