flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] danny0405 commented on a change in pull request #8548: [FLINK-6962] [table] Add a create table SQL DDL
Date Wed, 29 May 2019 07:38:08 GMT
danny0405 commented on a change in pull request #8548: [FLINK-6962] [table] Add a create table
SQL DDL
URL: https://github.com/apache/flink/pull/8548#discussion_r288430117
 
 

 ##########
 File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##########
 @@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.calcite.SqlParserTest;
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.sql.SqlNode;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/** FlinkSqlParserImpl tests. **/
+public class FlinkSqlParserImplTest extends SqlParserTest {
+
+	@Test
+	public void testCreateTable() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  h varchar header, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `h`  VARCHAR HEADER,\n" +
+				"  `g` AS (2 * (`a` + 1)),\n" +
+				"  `ts` AS `toTimestamp`(`b`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `proc` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`a`, `b`),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"PARTITIONED BY (`a`, `h`)\n" +
+				"WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithComment() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint comment 'test column comment AAA.',\n" +
+				"  h varchar header, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"comment 'test table comment ABC.'\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT  COMMENT 'test column comment AAA.',\n" +
+				"  `h`  VARCHAR HEADER,\n" +
+				"  `g` AS (2 * (`a` + 1)),\n" +
+				"  `ts` AS `toTimestamp`(`b`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `proc` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`a`, `b`),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"COMMENT 'test table comment ABC.'\n" +
+				"PARTITIONED BY (`a`, `h`)\n" +
+				"WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithPrimaryKeyAndUniqueKey() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint comment 'test column comment AAA.',\n" +
+				"  h varchar header, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b), \n" +
+				"  UNIQUE (h, g), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"comment 'test table comment ABC.'\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT  COMMENT 'test column comment AAA.',\n" +
+				"  `h`  VARCHAR HEADER,\n" +
+				"  `g` AS (2 * (`a` + 1)),\n" +
+				"  `ts` AS `toTimestamp`(`b`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `proc` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`a`, `b`),\n" +
+				"  UNIQUE (`h`, `g`),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"COMMENT 'test table comment ABC.'\n" +
+				"PARTITIONED BY (`a`, `h`)\n" +
+				"WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithoutWatermarkFieldName() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK FOR `a` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 DAY\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 DAY\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay1() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 HOUR\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 HOUR\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay2() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay3() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 SECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS BOUNDED WITH DELAY 1000 SECOND\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithNegativeWatermarkOffsetDelay() {
+		checkFails("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY ^-^1000 SECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"(?s).*Encountered \"-\" at line 5, column 44.\n" +
+				"Was expecting:\n" +
+				"    <UNSIGNED_INTEGER_LITERAL> ...\n" +
+				".*");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkStrategyAscending() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS ASCENDING\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS ASCENDING\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithWatermarkStrategyFromSource() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS FROM_SOURCE\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `tbl1` (\n" +
+				"  `a`  BIGINT,\n" +
+				"  `b`  VARCHAR,\n" +
+				"  `c` AS (2 * (`a` + 1)),\n" +
+				"  WATERMARK `wk` FOR `a` AS FROM_SOURCE\n" +
+				") WITH (\n" +
+				"  `connector` = 'kafka',\n" +
+				"  `kafka`.`topic` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithComplexType() {
+		check("CREATE TABLE tbl1 (\n" +
+			"  a ARRAY<bigint>, \n" +
+			"  b MAP<int, varchar>,\n" +
+			"  c STRUCT<cc0:int, cc1: float, cc2: varchar>,\n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n", "CREATE TABLE `tbl1` (\n" +
+			"  `a`  ARRAY< BIGINT >,\n" +
+			"  `b`  MAP< INTEGER, VARCHAR >,\n" +
+			"  `c`  STRUCT< `cc0` : INTEGER, `cc1` : FLOAT, `cc2` : VARCHAR >,\n" +
 
 Review comment:
   I agree, added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message