flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] asfgit closed pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
Date Tue, 28 Aug 2018 07:46:39 GMT
asfgit closed pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
URL: https://github.com/apache/flink/pull/6606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 0e8d2d651b1..f16f1a5561a 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -162,7 +162,7 @@ A SQL query needs a configuration environment in which it is executed. The so-ca
 Every environment file is a regular [YAML file](http://yaml.org/). An example of such a file is presented below.
 
 {% highlight yaml %}
-# Define table sources here.
+# Define table sources and sinks here.
 
 tables:
   - name: MyTableSource
@@ -185,6 +185,12 @@ tables:
       - name: MyField2
         type: VARCHAR
 
+# Define table views here.
+
+views:
+  - name: MyCustomView
+    query: "SELECT MyField2 FROM MyTableSource"
+
 # Define user-defined functions here.
 
 functions:
@@ -217,7 +223,8 @@ deployment:
 
 This configuration:
 
-- defines an environment with a table source `MyTableName` that reads from a CSV file,
+- defines an environment with a table source `MyTableSource` that reads from a CSV file,
+- defines a view `MyCustomView` that declares a virtual table using a SQL query,
 - defines a user-defined function `myUDF` that can be instantiated using the class name and two constructor parameters,
 - specifies a parallelism of 1 for queries executed in this streaming environment,
 - specifies an event-time characteristic, and
@@ -404,7 +411,7 @@ This process can be recursively performed until all the constructor parameters a
 {% top %}
 
 Detached SQL Queries
-------------------------
+--------------------
 
 In order to define end-to-end SQL pipelines, SQL's `INSERT INTO` statement can be used for submitting long-running, detached queries to a Flink cluster. These queries produce their results into an external system instead of the SQL Client. This allows for dealing with higher parallelism and larger amounts of data. The CLI itself does not have any control over a detached query after submission.
 
@@ -459,6 +466,44 @@ Web interface: http://localhost:8081
 
 {% top %}
 
+SQL Views
+---------
+
+Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general `INSERT INTO` or `SELECT` statement.
+
+Views can either be defined in [environment files](sqlClient.html#environment-files) or within the CLI session.
+
+The following example shows how to define multiple views in a file:
+
+{% highlight yaml %}
+views:
+  - name: MyRestrictedView
+    query: "SELECT MyField2 FROM MyTableSource"
+  - name: MyComplexView
+    query: >
+      SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR)
+      FROM MyTableSource
+      WHERE MyField2 > 200
+{% endhighlight %}
+
+Similar to table sources and sinks, views defined in a session environment file have highest precendence.
+
+Views can also be created within a CLI session using the `CREATE VIEW` statement:
+
+{% highlight text %}
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+{% endhighlight %}
+
+Views created within a CLI session can also be removed again using the `DROP VIEW` statement:
+
+{% highlight text %}
+DROP VIEW MyNewView
+{% endhighlight %}
+
+<span class="label label-danger">Attention</span> The definition of views is limited to the mentioned syntax above. Defining a schema for views or escape whitespaces in table names will be supported in future versions.
+
+{% top %}
+
 Limitations & Future
 --------------------
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 8be4ce63ecb..302651a78aa 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -25,7 +25,7 @@
 
 
 #==============================================================================
-# Table Sources
+# Table Sources & Sinks
 #==============================================================================
 
 # Define table sources and sinks here.
@@ -38,6 +38,17 @@ tables: [] # empty list
 #   format: ...
 #   schema: ...
 
+#==============================================================================
+# Table Views
+#==============================================================================
+
+# Define frequently used SQL queries here that define a virtual table.
+
+views: [] # empty list
+# A typical view definition looks like:
+# - name: ...
+#   query: "SELECT ..."
+
 #==============================================================================
 # User-defined functions
 #==============================================================================
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 3efaa6a3352..b9aca75d325 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
 
 import org.slf4j.Logger;
@@ -46,7 +47,7 @@
  * and allows for managing queries via console.
  *
  * <p>For debugging in an IDE you can execute the main method of this class using:
- * "embedded --defaults /path/to/-sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar"
+ * "embedded --defaults /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar"
  *
  * <p>Make sure that the FLINK_CONF_DIR environment variable is set.
  */
@@ -94,6 +95,9 @@ private void start() {
 				context = new SessionContext(options.getSessionId(), sessionEnv);
 			}
 
+			// validate the environment (defaults and session)
+			validateEnvironment(context, executor);
+
 			// add shutdown hook
 			Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor));
 
@@ -127,6 +131,17 @@ private void openCli(SessionContext context, Executor executor) {
 
 	// --------------------------------------------------------------------------------------------
 
+	private static void validateEnvironment(SessionContext context, Executor executor) {
+		System.out.print("Validating current environment...");
+		try {
+			executor.validateSession(context);
+			System.out.println("done.");
+		} catch (SqlExecutionException e) {
+			throw new SqlClientException(
+				"Current environment is invalid. Please check your configuration files again.", e);
+		}
+	}
+
 	private static void shutdown(SessionContext context, Executor executor) {
 		System.out.println();
 		System.out.print("Shutting down executor...");
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 576f5189c0b..8030c7c75da 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -203,8 +203,7 @@ public boolean submitUpdate(String statement) {
 				case INSERT_INTO:
 					return callInsertInto(cmdCall);
 				default:
-					terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
-					terminal.flush();
+					printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
 					return false;
 			}
 		}).orElse(false);
@@ -215,8 +214,7 @@ public boolean submitUpdate(String statement) {
 	private Optional<SqlCommandCall> parseCommand(String line) {
 		final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(line);
 		if (!parsedLine.isPresent()) {
-			terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
-			terminal.flush();
+			printError(CliStrings.MESSAGE_UNKNOWN_SQL);
 		}
 		return parsedLine;
 	}
@@ -224,26 +222,25 @@ public boolean submitUpdate(String statement) {
 	private void callCommand(SqlCommandCall cmdCall) {
 		switch (cmdCall.command) {
 			case QUIT:
-			case EXIT:
-				callQuit(cmdCall);
+				callQuit();
 				break;
 			case CLEAR:
-				callClear(cmdCall);
+				callClear();
 				break;
 			case RESET:
-				callReset(cmdCall);
+				callReset();
 				break;
 			case SET:
 				callSet(cmdCall);
 				break;
 			case HELP:
-				callHelp(cmdCall);
+				callHelp();
 				break;
 			case SHOW_TABLES:
-				callShowTables(cmdCall);
+				callShowTables();
 				break;
 			case SHOW_FUNCTIONS:
-				callShowFunctions(cmdCall);
+				callShowFunctions();
 				break;
 			case DESCRIBE:
 				callDescribe(cmdCall);
@@ -257,6 +254,12 @@ private void callCommand(SqlCommandCall cmdCall) {
 			case INSERT_INTO:
 				callInsertInto(cmdCall);
 				break;
+			case CREATE_VIEW:
+				callCreateView(cmdCall);
+				break;
+			case DROP_VIEW:
+				callDropView(cmdCall);
+				break;
 			case SOURCE:
 				callSource(cmdCall);
 				break;
@@ -265,19 +268,18 @@ private void callCommand(SqlCommandCall cmdCall) {
 		}
 	}
 
-	private void callQuit(SqlCommandCall cmdCall) {
-		terminal.writer().println(CliStrings.MESSAGE_QUIT);
-		terminal.flush();
+	private void callQuit() {
+		printInfo(CliStrings.MESSAGE_QUIT);
 		isRunning = false;
 	}
 
-	private void callClear(SqlCommandCall cmdCall) {
+	private void callClear() {
 		clearTerminal();
 	}
 
-	private void callReset(SqlCommandCall cmdCall) {
+	private void callReset() {
 		context.resetSessionProperties();
-		terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESET).toAnsi());
+		printInfo(CliStrings.MESSAGE_RESET);
 	}
 
 	private void callSet(SqlCommandCall cmdCall) {
@@ -287,7 +289,7 @@ private void callSet(SqlCommandCall cmdCall) {
 			try {
 				properties = executor.getSessionProperties(context);
 			} catch (SqlExecutionException e) {
-				printException(e);
+				printExecutionException(e);
 				return;
 			}
 			if (properties.isEmpty()) {
@@ -309,17 +311,17 @@ private void callSet(SqlCommandCall cmdCall) {
 		terminal.flush();
 	}
 
-	private void callHelp(SqlCommandCall cmdCall) {
+	private void callHelp() {
 		terminal.writer().println(CliStrings.MESSAGE_HELP);
 		terminal.flush();
 	}
 
-	private void callShowTables(SqlCommandCall cmdCall) {
+	private void callShowTables() {
 		final List<String> tables;
 		try {
 			tables = executor.listTables(context);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 		if (tables.isEmpty()) {
@@ -330,12 +332,12 @@ private void callShowTables(SqlCommandCall cmdCall) {
 		terminal.flush();
 	}
 
-	private void callShowFunctions(SqlCommandCall cmdCall) {
+	private void callShowFunctions() {
 		final List<String> functions;
 		try {
 			functions = executor.listUserDefinedFunctions(context);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 		if (functions.isEmpty()) {
@@ -351,7 +353,7 @@ private void callDescribe(SqlCommandCall cmdCall) {
 		try {
 			schema = executor.getTableSchema(context, cmdCall.operands[0]);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 		terminal.writer().println(schema.toString());
@@ -363,7 +365,7 @@ private void callExplain(SqlCommandCall cmdCall) {
 		try {
 			explanation = executor.explainStatement(context, cmdCall.operands[0]);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 		terminal.writer().println(explanation);
@@ -375,7 +377,7 @@ private void callSelect(SqlCommandCall cmdCall) {
 		try {
 			resultDesc = executor.executeQuery(context, cmdCall.operands[0]);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 		final CliResultView view;
@@ -390,16 +392,14 @@ private void callSelect(SqlCommandCall cmdCall) {
 			view.open();
 
 			// view left
-			terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi());
-			terminal.flush();
+			printInfo(CliStrings.MESSAGE_RESULT_QUIT);
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 		}
 	}
 
 	private boolean callInsertInto(SqlCommandCall cmdCall) {
-		terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
-		terminal.flush();
+		printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
 
 		try {
 			final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
@@ -407,20 +407,58 @@ private boolean callInsertInto(SqlCommandCall cmdCall) {
 			terminal.writer().println(programTarget.toString());
 			terminal.flush();
 		} catch (SqlExecutionException e) {
-			printException(e);
+			printExecutionException(e);
 			return false;
 		}
 		return true;
 	}
 
-	private void callSource(SqlCommandCall cmdCall) {
-		final String pathString = cmdCall.operands[0];
+	private void callCreateView(SqlCommandCall cmdCall) {
+		final String name = cmdCall.operands[0];
+		final String query = cmdCall.operands[1];
 
-		if (pathString.isEmpty()) {
-			printError(CliStrings.MESSAGE_INVALID_PATH);
+		final String previousQuery = context.getViews().get(name);
+		if (previousQuery != null) {
+			printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
 			return;
 		}
 
+		try {
+			// perform and validate change
+			context.addView(name, query);
+			executor.validateSession(context);
+			printInfo(CliStrings.MESSAGE_VIEW_CREATED);
+		} catch (SqlExecutionException e) {
+			// rollback change
+			context.removeView(name);
+			printExecutionException(e);
+		}
+	}
+
+	private void callDropView(SqlCommandCall cmdCall) {
+		final String name = cmdCall.operands[0];
+		final String query = context.getViews().get(name);
+
+		if (query == null) {
+			printExecutionError(CliStrings.MESSAGE_VIEW_NOT_FOUND);
+			return;
+		}
+
+		try {
+			// perform and validate change
+			context.removeView(name);
+			executor.validateSession(context);
+			printInfo(CliStrings.MESSAGE_VIEW_REMOVED);
+		} catch (SqlExecutionException e) {
+			// rollback change
+			context.addView(name, query);
+			printExecutionException(CliStrings.MESSAGE_VIEW_NOT_REMOVED, e);
+		}
+	}
+
+	private void callSource(SqlCommandCall cmdCall) {
+		final String pathString = cmdCall.operands[0];
+
 		// load file
 		final String stmt;
 		try {
@@ -428,13 +466,13 @@ private void callSource(SqlCommandCall cmdCall) {
 			byte[] encoded = Files.readAllBytes(path);
 			stmt = new String(encoded, Charset.defaultCharset());
 		} catch (IOException e) {
-			printException(e);
+			printExecutionException(e);
 			return;
 		}
 
 		// limit the output a bit
 		if (stmt.length() > SOURCE_MAX_SIZE) {
-			printError(CliStrings.MESSAGE_MAX_SIZE_EXCEEDED);
+			printExecutionError(CliStrings.MESSAGE_MAX_SIZE_EXCEEDED);
 			return;
 		}
 
@@ -449,14 +487,38 @@ private void callSource(SqlCommandCall cmdCall) {
 
 	// --------------------------------------------------------------------------------------------
 
-	private void printException(Throwable t) {
-		LOG.warn(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t);
-		terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t).toAnsi());
+	private void printExecutionException(Throwable t) {
+		printExecutionException(null, t);
+	}
+
+	private void printExecutionException(String message, Throwable t) {
+		final String finalMessage;
+		if (message == null) {
+			finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
+		} else {
+			finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR + ' ' + message;
+		}
+		printException(finalMessage, t);
+	}
+
+	private void printExecutionError(String message) {
+		terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, message).toAnsi());
+		terminal.flush();
+	}
+
+	private void printException(String message, Throwable t) {
+		LOG.warn(message, t);
+		terminal.writer().println(CliStrings.messageError(message, t).toAnsi());
+		terminal.flush();
+	}
+
+	private void printError(String message) {
+		terminal.writer().println(CliStrings.messageError(message).toAnsi());
 		terminal.flush();
 	}
 
-	private void printError(String e) {
-		terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, e).toAnsi());
+	private void printInfo(String message) {
+		terminal.writer().println(CliStrings.messageInfo(message).toAnsi());
 		terminal.flush();
 	}
 }
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index aef76698449..42c5f715e58 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,6 +50,8 @@ private CliStrings() {
 		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
 		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
 		.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
+		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>'"))
+		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>'"))
 		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
 		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
 		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
@@ -115,7 +117,7 @@ private CliStrings() {
 
 	public static final String MESSAGE_EMPTY = "Result was empty.";
 
-	public static final String MESSAGE_UNKNOWN_SQL = "Unknown SQL statement.";
+	public static final String MESSAGE_UNKNOWN_SQL = "Unknown or invalid SQL statement.";
 
 	public static final String MESSAGE_UNKNOWN_TABLE = "Unknown table.";
 
@@ -127,14 +129,25 @@ private CliStrings() {
 
 	public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:";
 
-	public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
-
 	public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
 
 	public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
 
 	public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL statement.";
 
+	public static final String MESSAGE_VIEW_CREATED = "View has been created.";
+
+	public static final String MESSAGE_VIEW_REMOVED = "View has been removed.";
+
+	public static final String MESSAGE_VIEW_ALREADY_EXISTS = "A view with this name has already been defined in the current CLI session.";
+
+	public static final String MESSAGE_VIEW_NOT_FOUND = "The given view does not exist in the current CLI session. " +
+		"Only views created with a CREATE VIEW statement can be accessed.";
+
+	public static final String MESSAGE_VIEW_NOT_REMOVED = "The given view cannot be removed without affecting other views.";
+
+	public static final String MESSAGE_ENVIRONMENT_INVALID = "The configured environment is invalid. Please check your environment files again.";
+
 	// --------------------------------------------------------------------------------------------
 
 	public static final String RESULT_TITLE = "SQL Query Result";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 376b1f1558e..83fbabc0931 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.table.client.cli;
 
+import java.util.Arrays;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Simple parser for determining the type of command and its parameters.
@@ -30,86 +35,121 @@ private SqlCommandParser() {
 	}
 
 	public static Optional<SqlCommandCall> parse(String stmt) {
-		String trimmed = stmt.trim();
+		// normalize
+		stmt = stmt.trim();
 		// remove ';' at the end because many people type it intuitively
-		if (trimmed.endsWith(";")) {
-			trimmed = trimmed.substring(0, trimmed.length() - 1);
+		if (stmt.endsWith(";")) {
+			stmt = stmt.substring(0, stmt.length() - 1).trim();
 		}
+
+		// parse
 		for (SqlCommand cmd : SqlCommand.values()) {
-			int pos = 0;
-			int tokenCount = 0;
-			for (String token : trimmed.split("\\s")) {
-				pos += token.length() + 1; // include space character
-				// check for content
-				if (token.length() > 0) {
-					// match
-					if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
-						if (tokenCount == cmd.tokens.length - 1) {
-							final SqlCommandCall call = new SqlCommandCall(
-								cmd,
-								splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
-							);
-							return Optional.of(call);
-						}
-					} else {
-						// next sql command
-						break;
-					}
-					tokenCount++; // check next token
+			final Matcher matcher = cmd.pattern.matcher(stmt);
+			if (matcher.matches()) {
+				final String[] groups = new String[matcher.groupCount()];
+				for (int i = 0; i < groups.length; i++) {
+					groups[i] = matcher.group(i + 1);
 				}
+				return cmd.operandConverter.apply(groups)
+					.map((operands) -> new SqlCommandCall(cmd, operands));
 			}
 		}
 		return Optional.empty();
 	}
 
-	private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
-		switch (cmd) {
-			case SET:
-				final int delimiter = operands.indexOf('=');
-				if (delimiter < 0) {
-					return new String[] {};
-				} else {
-					return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
-				}
-			case SELECT:
-			case INSERT_INTO:
-				return new String[] {originalCall};
-			default:
-				return new String[] {operands};
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 
+	private static final Function<String[], Optional<String[]>> NO_OPERANDS =
+		(operands) -> Optional.of(new String[0]);
+
+	private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
+		(operands) -> Optional.of(new String[]{operands[0]});
+
+	private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+
 	/**
 	 * Supported SQL commands.
 	 */
 	enum SqlCommand {
-		QUIT("quit"),
-		EXIT("exit"),
-		CLEAR("clear"),
-		HELP("help"),
-		SHOW_TABLES("show tables"),
-		SHOW_FUNCTIONS("show functions"),
-		DESCRIBE("describe"),
-		EXPLAIN("explain"),
-		SELECT("select"),
-		INSERT_INTO("insert into"),
-		SET("set"),
-		RESET("reset"),
-		SOURCE("source");
-
-		public final String command;
-		public final String[] tokens;
-
-		SqlCommand(String command) {
-			this.command = command;
-			this.tokens = command.split(" ");
+		QUIT(
+			"(QUIT|EXIT)",
+			NO_OPERANDS),
+
+		CLEAR(
+			"CLEAR",
+			NO_OPERANDS),
+
+		HELP(
+			"HELP",
+			NO_OPERANDS),
+
+		SHOW_TABLES(
+			"SHOW\\s+TABLES",
+			NO_OPERANDS),
+
+		SHOW_FUNCTIONS(
+			"SHOW\\s+FUNCTIONS",
+			NO_OPERANDS),
+
+		DESCRIBE(
+			"DESCRIBE\\s+(.*)",
+			SINGLE_OPERAND),
+
+		EXPLAIN(
+			"EXPLAIN\\s+(.*)",
+			SINGLE_OPERAND),
+
+		SELECT(
+			"(SELECT.*)",
+			SINGLE_OPERAND),
+
+		INSERT_INTO(
+			"(INSERT\\s+INTO.*)",
+			SINGLE_OPERAND),
+
+		CREATE_VIEW(
+			"CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
+			(operands) -> {
+				if (operands.length < 2) {
+					return Optional.empty();
+				}
+				return Optional.of(new String[]{operands[0], operands[1]});
+			}),
+
+		DROP_VIEW(
+			"DROP\\s+VIEW\\s+(.*)",
+			SINGLE_OPERAND),
+
+		SET(
+			"SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
+			(operands) -> {
+				if (operands.length < 3) {
+					return Optional.empty();
+				} else if (operands[0] == null) {
+					return Optional.of(new String[0]);
+				}
+				return Optional.of(new String[]{operands[1], operands[2]});
+			}),
+
+		RESET(
+			"RESET",
+			NO_OPERANDS),
+
+		SOURCE(
+			"SOURCE\\s+(.*)",
+			SINGLE_OPERAND);
+
+		public final Pattern pattern;
+		public final Function<String[], Optional<String[]>> operandConverter;
+
+		SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
+			this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+			this.operandConverter = operandConverter;
 		}
 
 		@Override
 		public String toString() {
-			return command.toUpperCase();
+			return super.toString().replace('_', ' ');
 		}
 	}
 
@@ -124,5 +164,33 @@ public SqlCommandCall(SqlCommand command, String[] operands) {
 			this.command = command;
 			this.operands = operands;
 		}
+
+		public SqlCommandCall(SqlCommand command) {
+			this(command, new String[0]);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			SqlCommandCall that = (SqlCommandCall) o;
+			return command == that.command && Arrays.equals(operands, that.operands);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = Objects.hash(command);
+			result = 31 * result + Arrays.hashCode(operands);
+			return result;
+		}
+
+		@Override
+		public String toString() {
+			return command + "(" + Arrays.toString(operands) + ")";
+		}
 	}
 }
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
index 337d8035915..31cf946960d 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/ConfigUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.table.client.SqlClientException;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.IOContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -40,6 +42,20 @@ private ConfigUtil() {
 		// private
 	}
 
+	/**
+	 * Extracts an early string property from YAML (before normalization).
+	 */
+	public static String extractEarlyStringProperty(Map<String, Object> map, String key, String parent) {
+		if (!map.containsKey(key)) {
+			throw new SqlClientException("The '" + key + "' attribute of " + parent + " is missing.");
+		}
+		final Object object = map.get(key);
+		if (object == null || !(object instanceof String) || ((String) object).trim().length() <= 0) {
+			throw new SqlClientException("Invalid " + parent + " " + key + " '" + object + "'.");
+		}
+		return (String) object;
+	}
+
 	/**
 	 * Normalizes key-value properties from Yaml in the normalized format of the Table API.
 	 */
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 0853afbb45e..b0d1528a4c8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -26,9 +26,12 @@
 import java.net.URL;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.table.client.config.ConfigUtil.extractEarlyStringProperty;
+
 /**
  * Environment configuration that represents the content of an environment file. Environment files
  * define tables, execution, and deployment behavior. An environment might be defined by default or
@@ -41,6 +44,8 @@
 
 	private Map<String, TableDescriptor> tables;
 
+	private Map<String, String> views;
+
 	private Map<String, UserDefinedFunction> functions;
 
 	private Execution execution;
@@ -52,9 +57,12 @@
 	private static final String TABLE_TYPE_VALUE_SOURCE = "source";
 	private static final String TABLE_TYPE_VALUE_SINK = "sink";
 	private static final String TABLE_TYPE_VALUE_BOTH = "both";
+	private static final String VIEW_NAME = "name";
+	private static final String VIEW_QUERY = "query";
 
 	public Environment() {
 		this.tables = Collections.emptyMap();
+		this.views = Collections.emptyMap();
 		this.functions = Collections.emptyMap();
 		this.execution = new Execution();
 		this.deployment = new Deployment();
@@ -67,24 +75,38 @@ public Environment() {
 	public void setTables(List<Map<String, Object>> tables) {
 		this.tables = new HashMap<>(tables.size());
 		tables.forEach(config -> {
-			if (!config.containsKey(TABLE_NAME)) {
-				throw new SqlClientException("The 'name' attribute of a table is missing.");
-			}
-			final Object nameObject = config.get(TABLE_NAME);
-			if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) {
-				throw new SqlClientException("Invalid table name '" + nameObject + "'.");
-			}
-			final String name = (String) nameObject;
+			final String name = extractEarlyStringProperty(config, TABLE_NAME, "table");
 			final Map<String, Object> properties = new HashMap<>(config);
 			properties.remove(TABLE_NAME);
 
-			if (this.tables.containsKey(name)) {
-				throw new SqlClientException("Duplicate table name '" + name + "'.");
+			if (this.tables.containsKey(name) || this.views.containsKey(name)) {
+				throw new SqlClientException("Cannot create table '" + name + "' because a table or " +
+					"view with this name is already registered.");
 			}
 			this.tables.put(name, createTableDescriptor(name, properties));
 		});
 	}
 
+	public Map<String, String> getViews() {
+		return views;
+	}
+
+	public void setViews(List<Map<String, Object>> views) {
+		// the order of how views are registered matters because
+		// they might reference each other
+		this.views = new LinkedHashMap<>(views.size());
+		views.forEach(config -> {
+			final String name = extractEarlyStringProperty(config, VIEW_NAME, "view");
+			final String query = extractEarlyStringProperty(config, VIEW_QUERY, "view");
+
+			if (this.tables.containsKey(name) || this.views.containsKey(name)) {
+				throw new SqlClientException("Cannot create view '" + name + "' because a table or " +
+					"view with this name is already registered.");
+			}
+			this.views.put(name, query);
+		});
+	}
+
 	public Map<String, UserDefinedFunction> getFunctions() {
 		return functions;
 	}
@@ -126,6 +148,11 @@ public String toString() {
 			table.addProperties(props);
 			props.asMap().forEach((k, v) -> sb.append("  ").append(k).append(": ").append(v).append('\n'));
 		});
+		sb.append("===================== Views =====================\n");
+		views.forEach((name, query) -> {
+			sb.append("- name: ").append(name).append("\n");
+			sb.append("  ").append(VIEW_QUERY).append(": ").append(query).append('\n');
+		});
 		sb.append("=================== Functions ====================\n");
 		functions.forEach((name, function) -> {
 			sb.append("- name: ").append(name).append("\n");
@@ -167,6 +194,11 @@ public static Environment merge(Environment env1, Environment env2) {
 		tables.putAll(env2.getTables());
 		mergedEnv.tables = tables;
 
+		// merge views
+		final LinkedHashMap<String, String> views = new LinkedHashMap<>(env1.getViews());
+		views.putAll(env2.getViews());
+		mergedEnv.views = views;
+
 		// merge functions
 		final Map<String, UserDefinedFunction> functions = new HashMap<>(env1.getFunctions());
 		functions.putAll(env2.getFunctions());
@@ -182,9 +214,12 @@ public static Environment merge(Environment env1, Environment env2) {
 	}
 
 	/**
-	 * Enriches an environment with new/modified properties and returns the new instance.
+	 * Enriches an environment with new/modified properties or views and returns the new instance.
 	 */
-	public static Environment enrich(Environment env, Map<String, String> properties) {
+	public static Environment enrich(
+			Environment env,
+			Map<String, String> properties,
+			Map<String, String> views) {
 		final Environment enrichedEnv = new Environment();
 
 		// merge tables
@@ -199,6 +234,10 @@ public static Environment enrich(Environment env, Map<String, String> properties
 		// enrich deployment properties
 		enrichedEnv.deployment = Deployment.enrich(env.deployment, properties);
 
+		// enrich views
+		enrichedEnv.views = new LinkedHashMap<>(env.getViews());
+		enrichedEnv.views.putAll(views);
+
 		return enrichedEnv;
 	}
 
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
index 8652d40b484..8a8c1a1d695 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UserDefinedFunction.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.table.client.config;
 
-import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FunctionDescriptor;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.client.config.ConfigUtil.extractEarlyStringProperty;
+
 /**
  * Descriptor for user-defined functions.
  */
@@ -33,7 +34,7 @@
 	private String name;
 	private Map<String, String> properties;
 
-	private static final String NAME = "name";
+	private static final String FUNCTION_NAME = "name";
 
 	private UserDefinedFunction(String name, Map<String, String> properties) {
 		this.name = name;
@@ -52,16 +53,10 @@ public String getName() {
 	 * Creates a user-defined function descriptor with the given config.
 	 */
 	public static UserDefinedFunction create(Map<String, Object> config) {
-		if (!config.containsKey(NAME)) {
-			throw new SqlClientException("The 'name' attribute of a function is missing.");
-		}
-		final Object name = config.get(NAME);
-		if (name == null || !(name instanceof String) || ((String) name).trim().length() <= 0) {
-			throw new SqlClientException("Invalid function name '" + name + "'.");
-		}
+		final String name = extractEarlyStringProperty(config, FUNCTION_NAME, "function");
 		final Map<String, Object> properties = new HashMap<>(config);
-		properties.remove(NAME);
-		return new UserDefinedFunction((String) name, ConfigUtil.normalizeYaml(properties));
+		properties.remove(FUNCTION_NAME);
+		return new UserDefinedFunction(name, ConfigUtil.normalizeYaml(properties));
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 3a4dd81bfb8..d6410161c93 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -97,6 +97,11 @@
 	 */
 	ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException;
 
+	/**
+	 * Validates the current session. For example, it checks whether all views are still valid.
+	 */
+	void validateSession(SessionContext session) throws SqlExecutionException;
+
 	/**
 	 * Stops the executor.
 	 */
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index 6fa640f53a9..d154df3deeb 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -20,7 +20,9 @@
 
 import org.apache.flink.table.client.config.Environment;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -30,13 +32,20 @@
 public class SessionContext {
 
 	private final String name;
+
 	private final Environment defaultEnvironment;
+
 	private final Map<String, String> sessionProperties;
 
+	private final Map<String, String> views;
+
 	public SessionContext(String name, Environment defaultEnvironment) {
 		this.name = name;
 		this.defaultEnvironment = defaultEnvironment;
 		this.sessionProperties = new HashMap<>();
+		// the order of how views are registered matters because
+		// they might reference each other
+		this.views = new LinkedHashMap<>();
 	}
 
 	public void setSessionProperty(String key, String value) {
@@ -47,18 +56,33 @@ public void resetSessionProperties() {
 		sessionProperties.clear();
 	}
 
+	public void addView(String name, String query) {
+		views.put(name, query);
+	}
+
+	public void removeView(String name) {
+		views.remove(name);
+	}
+
+	public Map<String, String> getViews() {
+		return Collections.unmodifiableMap(views);
+	}
+
 	public String getName() {
 		return name;
 	}
 
 	public Environment getEnvironment() {
-		// enrich with session properties
-		return Environment.enrich(defaultEnvironment, sessionProperties);
+		return Environment.enrich(
+			defaultEnvironment,
+			sessionProperties,
+			views);
 	}
 
 	public SessionContext copy() {
 		final SessionContext session = new SessionContext(name, defaultEnvironment);
 		session.sessionProperties.putAll(sessionProperties);
+		session.views.putAll(views);
 		return session;
 	}
 
@@ -73,11 +97,16 @@ public boolean equals(Object o) {
 		SessionContext context = (SessionContext) o;
 		return Objects.equals(name, context.name) &&
 			Objects.equals(defaultEnvironment, context.defaultEnvironment) &&
-			Objects.equals(sessionProperties, context.sessionProperties);
+			Objects.equals(sessionProperties, context.sessionProperties) &&
+			Objects.equals(views, context.views);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(name, defaultEnvironment, sessionProperties);
+		return Objects.hash(
+			name,
+			defaultEnvironment,
+			sessionProperties,
+			views);
 	}
 }
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 9ff683755d2..85b3e9265a8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -42,6 +42,7 @@
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.client.config.Deployment;
@@ -309,6 +310,18 @@ private EnvironmentInstance() {
 					}
 				});
 			}
+
+			// register views
+			mergedEnv.getViews().forEach((name, query) -> {
+				// if registering a view fails at this point
+				// it means that it accesses tables that are not available anymore
+				try {
+					tableEnv.registerTable(name, tableEnv.sqlQuery(query));
+				} catch (ValidationException e) {
+					throw new SqlExecutionException(
+						"Invalid view '" + name + "' with query:\n" + query + "\nCause: " + e.getMessage());
+				}
+			});
 		}
 
 		public QueryConfig getQueryConfig() {
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index b2e82715bd5..3b9e8e99b82 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -288,6 +288,12 @@ public ProgramTargetDescriptor executeUpdate(SessionContext session, String stat
 		return executeUpdateInternal(context, statement);
 	}
 
+	@Override
+	public void validateSession(SessionContext session) throws SqlExecutionException {
+		// throws exceptions if an environment cannot be created with the given session context
+		getOrCreateExecutionContext(session).createEnvironmentInstance();
+	}
+
 	@Override
 	public void stop(SessionContext session) {
 		resultStore.getResults().forEach((resultId) -> {
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 99318bee996..90c91959ac3 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -153,6 +153,11 @@ public ProgramTargetDescriptor executeUpdate(SessionContext session, String stat
 			return new ProgramTargetDescriptor("testClusterId", "testJobId", "http://testcluster:1234");
 		}
 
+		@Override
+		public void validateSession(SessionContext session) throws SqlExecutionException {
+			// nothing to do
+		}
+
 		@Override
 		public void stop(SessionContext session) {
 			// nothing to do
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
new file mode 100644
index 00000000000..9ad7fe9d1fb
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
+import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SqlCommandParser}.
+ */
+public class SqlCommandParserTest {
+
+	@Test
+	public void testCommands() {
+		testValidSqlCommand("QUIT;", new SqlCommandCall(SqlCommand.QUIT));
+		testValidSqlCommand("eXiT", new SqlCommandCall(SqlCommand.QUIT));
+		testValidSqlCommand("CLEAR", new SqlCommandCall(SqlCommand.CLEAR));
+		testValidSqlCommand("SHOW TABLES", new SqlCommandCall(SqlCommand.SHOW_TABLES));
+		testValidSqlCommand("  SHOW   TABLES   ", new SqlCommandCall(SqlCommand.SHOW_TABLES));
+		testValidSqlCommand("SHOW FUNCTIONS", new SqlCommandCall(SqlCommand.SHOW_FUNCTIONS));
+		testValidSqlCommand("  SHOW    FUNCTIONS   ", new SqlCommandCall(SqlCommand.SHOW_FUNCTIONS));
+		testValidSqlCommand("DESCRIBE MyTable", new SqlCommandCall(SqlCommand.DESCRIBE, new String[]{"MyTable"}));
+		testValidSqlCommand("DESCRIBE         MyTable     ", new SqlCommandCall(SqlCommand.DESCRIBE, new String[]{"MyTable"}));
+		testInvalidSqlCommand("DESCRIBE  "); // no table name
+		testValidSqlCommand(
+			"EXPLAIN SELECT complicated FROM table",
+			new SqlCommandCall(SqlCommand.EXPLAIN, new String[]{"SELECT complicated FROM table"}));
+		testInvalidSqlCommand("EXPLAIN  "); // no query
+		testValidSqlCommand(
+			"SELECT complicated FROM table",
+			new SqlCommandCall(SqlCommand.SELECT, new String[]{"SELECT complicated FROM table"}));
+		testValidSqlCommand(
+			"   SELECT  complicated FROM table    ",
+			new SqlCommandCall(SqlCommand.SELECT, new String[]{"SELECT  complicated FROM table"}));
+		testValidSqlCommand(
+			"INSERT INTO other SELECT 1+1",
+			new SqlCommandCall(SqlCommand.INSERT_INTO, new String[]{"INSERT INTO other SELECT 1+1"}));
+		testValidSqlCommand(
+			"CREATE VIEW x AS SELECT 1+1",
+			new SqlCommandCall(SqlCommand.CREATE_VIEW, new String[]{"x", "SELECT 1+1"}));
+		testValidSqlCommand(
+			"CREATE   VIEW    MyTable   AS     SELECT 1+1 FROM y",
+			new SqlCommandCall(SqlCommand.CREATE_VIEW, new String[]{"MyTable", "SELECT 1+1 FROM y"}));
+		testInvalidSqlCommand("CREATE VIEW x SELECT 1+1"); // missing AS
+		testValidSqlCommand("DROP VIEW MyTable", new SqlCommandCall(SqlCommand.DROP_VIEW, new String[]{"MyTable"}));
+		testValidSqlCommand("DROP VIEW  MyTable", new SqlCommandCall(SqlCommand.DROP_VIEW, new String[]{"MyTable"}));
+		testInvalidSqlCommand("DROP VIEW");
+		testValidSqlCommand("SET", new SqlCommandCall(SqlCommand.SET));
+		testValidSqlCommand("SET x=y", new SqlCommandCall(SqlCommand.SET, new String[] {"x", "y"}));
+		testValidSqlCommand("SET    x  = y", new SqlCommandCall(SqlCommand.SET, new String[] {"x", " y"}));
+		testValidSqlCommand("reset;", new SqlCommandCall(SqlCommand.RESET));
+		testValidSqlCommand("source /my/file", new SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"}));
+		testInvalidSqlCommand("source"); // missing path
+	}
+
+	private void testInvalidSqlCommand(String stmt) {
+		final Optional<SqlCommandCall> actualCall = SqlCommandParser.parse(stmt);
+		if (actualCall.isPresent()) {
+			fail();
+		}
+	}
+
+	private void testValidSqlCommand(String stmt, SqlCommandCall expectedCall) {
+		final Optional<SqlCommandCall> actualCall = SqlCommandParser.parse(stmt);
+		if (!actualCall.isPresent()) {
+			fail();
+		}
+		assertEquals(expectedCall, actualCall.get());
+	}
+}
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 04575c696a2..6752abfd195 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -79,10 +79,11 @@ public void testFunctions() throws Exception {
 	}
 
 	@Test
-	public void testSourceSinks() throws Exception {
+	public void testTables() throws Exception {
 		final ExecutionContext<?> context = createExecutionContext();
 		final Map<String, TableSource<?>> sources = context.getTableSources();
 		final Map<String, TableSink<?>> sinks = context.getTableSinks();
+		final Map<String, String> views = context.getMergedEnvironment().getViews();
 
 		assertEquals(
 			new HashSet<>(Arrays.asList("TableSourceSink", "TableNumber1", "TableNumber2")),
@@ -92,6 +93,10 @@ public void testSourceSinks() throws Exception {
 			new HashSet<>(Collections.singletonList("TableSourceSink")),
 			sinks.keySet());
 
+		assertEquals(
+			new HashSet<>(Arrays.asList("TestView1", "TestView2")),
+			views.keySet());
+
 		assertArrayEquals(
 			new String[]{"IntegerField1", "StringField1"},
 			sources.get("TableNumber1").getTableSchema().getColumnNames());
@@ -119,7 +124,7 @@ public void testSourceSinks() throws Exception {
 		final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
 
 		assertArrayEquals(
-			new String[]{"TableNumber1", "TableNumber2", "TableSourceSink"},
+			new String[]{"TableNumber1", "TableNumber2", "TableSourceSink", "TestView1", "TestView2"},
 			tableEnv.listTables());
 	}
 
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index ed4ce7a9d8d..76648d08e1a 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -36,6 +36,7 @@
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SessionContext;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.test.util.MiniClusterResource;
@@ -104,6 +105,49 @@ private static Configuration getConfig() {
 		return config;
 	}
 
+	@Test
+	public void testValidateSession() throws Exception {
+		final Executor executor = createDefaultExecutor(clusterClient);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		executor.validateSession(session);
+
+		session.addView("AdditionalView1", "SELECT 1");
+		session.addView("AdditionalView2", "SELECT * FROM AdditionalView1");
+		executor.validateSession(session);
+
+		List<String> actualTables = executor.listTables(session);
+		List<String> expectedTables = Arrays.asList(
+			"AdditionalView1",
+			"AdditionalView2",
+			"TableNumber1",
+			"TableNumber2",
+			"TableSourceSink",
+			"TestView1",
+			"TestView2");
+		assertEquals(expectedTables, actualTables);
+
+		session.removeView("AdditionalView1");
+		try {
+			executor.validateSession(session);
+			fail();
+		} catch (SqlExecutionException e) {
+			// AdditionalView2 needs AdditionalView1
+		}
+
+		session.removeView("AdditionalView2");
+		executor.validateSession(session);
+
+		actualTables = executor.listTables(session);
+		expectedTables = Arrays.asList(
+			"TableNumber1",
+			"TableNumber2",
+			"TableSourceSink",
+			"TestView1",
+			"TestView2");
+		assertEquals(expectedTables, actualTables);
+	}
+
 	@Test
 	public void testListTables() throws Exception {
 		final Executor executor = createDefaultExecutor(clusterClient);
@@ -111,7 +155,12 @@ public void testListTables() throws Exception {
 
 		final List<String> actualTables = executor.listTables(session);
 
-		final List<String> expectedTables = Arrays.asList("TableNumber1", "TableNumber2", "TableSourceSink");
+		final List<String> expectedTables = Arrays.asList(
+			"TableNumber1",
+			"TableNumber2",
+			"TableSourceSink",
+			"TestView1",
+			"TestView2");
 		assertEquals(expectedTables, actualTables);
 	}
 
@@ -264,19 +313,19 @@ public void testBatchQueryExecution() throws Exception {
 		final SessionContext session = new SessionContext("test-session", new Environment());
 
 		try {
-			final ResultDescriptor desc = executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1");
+			final ResultDescriptor desc = executor.executeQuery(session, "SELECT * FROM TestView1");
 
 			assertTrue(desc.isMaterialized());
 
 			final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
 
 			final List<String> expectedResults = new ArrayList<>();
-			expectedResults.add("42");
-			expectedResults.add("22");
-			expectedResults.add("32");
-			expectedResults.add("32");
-			expectedResults.add("42");
-			expectedResults.add("52");
+			expectedResults.add("47");
+			expectedResults.add("27");
+			expectedResults.add("37");
+			expectedResults.add("37");
+			expectedResults.add("47");
+			expectedResults.add("57");
 
 			TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
 		} finally {
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 22351bc3959..cd5257e611c 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -83,6 +83,12 @@ tables:
         - name: StringField
           type: VARCHAR
 
+views:
+  - name: TestView1
+    query: SELECT scalarUDF(IntegerField1) FROM TableNumber1
+  - name: TestView2
+    query: SELECT * FROM TestView1
+
 functions:
   - name: scalarUDF
     from: class


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message