storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-1443 Support customizing parallelism in StormSQL
Date Fri, 03 Feb 2017 01:26:52 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 02ab70c92 -> a79784010


STORM-1443 Support customizing parallelism in StormSQL

* Add 'PARALLELISM' to table definition
  * default value is 1
* Set parallelism to new stream while creating stream with scan
  * downstream operators will also have same parallelism unless repartitioned
  * not apply parallelism to output table since it can trigger repartition


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d34c2ebf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d34c2ebf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d34c2ebf

Branch: refs/heads/1.x-branch
Commit: d34c2ebf11b92a54fb1b1692ae4a8dae8e0cfb2b
Parents: 02ab70c
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Wed Oct 19 18:25:53 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Feb 3 10:23:18 2017 +0900

----------------------------------------------------------------------
 docs/storm-sql.md                               | 12 +++++--
 external/sql/README.md                          |  7 ++--
 .../storm-sql-core/src/codegen/data/Parser.tdd  |  1 +
 .../src/codegen/includes/parserImpls.ftl        |  4 ++-
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  3 ++
 .../sql/calcite/ParallelStreamableTable.java    | 35 ++++++++++++++++++++
 .../apache/storm/sql/compiler/CompilerUtil.java | 14 +++++++-
 .../apache/storm/sql/parser/SqlCreateTable.java | 24 +++++++++++---
 .../trident/rel/TridentStreamScanRel.java       |  8 +++--
 .../planner/trident/rules/TridentScanRule.java  | 12 +++++--
 10 files changed, 105 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 3b7d897..b06f068 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -46,11 +46,19 @@ CREATE EXTERNAL TABLE table_name field_list
       OUTPUTFORMAT output_format_classname
     ]
     LOCATION location
+    [ PARALLELISM parallelism ]
     [ TBLPROPERTIES tbl_properties ]
     [ AS select_stmt ]
 ```
 
-You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
For example, the following statement specifies a Kafka spout and sink:
+You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).

+
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source.
This is same as providing parallelism hint to Trident Spout.
+As same as Trident, downstream operators are executed with same parallelism before repartition
(Aggregation triggers repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change
if needed. Normally repartition is the thing to avoid.)
+
+For example, the following statement specifies a Kafka spout and sink:
 
 ```
 CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test'
TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
@@ -159,5 +167,3 @@ LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=
 
 - Windowing is yet to be implemented.
 - Aggregation and join are not supported (waiting for `Streaming SQL` to be matured)
-- Specifying parallelism hints in the topology is not yet supported. 
-  - All processors have a parallelism hint of 1.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/README.md
----------------------------------------------------------------------
diff --git a/external/sql/README.md b/external/sql/README.md
index a17f1ff..a4b44fb 100644
--- a/external/sql/README.md
+++ b/external/sql/README.md
@@ -38,6 +38,11 @@ CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/
 The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in
 [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
 
+`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source.
This is same as providing parallelism hint to Trident Spout.
+Downstream operators are executed with same parallelism before repartition (Aggregation triggers
repartition).
+
+Default value is 1, and this option is no effect on output data source. (We might change
if needed. Normally repartition is the thing to avoid.)
+
 ## Plugging in External Data Sources
 
 Users plug in external data sources through implementing the `ISqlTridentDataSource` interface
and registers them using
@@ -177,8 +182,6 @@ LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=
   - Not across batches.
   - Limitation came from `join` feature of Trident.
   - Please refer this doc: `Trident API Overview` for details.
-- Specifying parallelism hints in the topology is not yet supported. 
-  - All processors have a parallelism hint of 1.
 
 ## License
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index 79a793a..b0dccb6 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -32,6 +32,7 @@
     "LOCATION",
     "INPUTFORMAT",
     "OUTPUTFORMAT",
+    "PARALLELISM",
     "STORED",
     "TBLPROPERTIES",
     "JAR"

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 0013231..4143840 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -61,6 +61,7 @@ SqlNode SqlCreateTable() :
     SqlIdentifier tblName;
     SqlNodeList fieldList;
     SqlNode location;
+    SqlNode parallelism = null;
     SqlNode input_format_class_name = null, output_format_class_name = null;
     SqlNode tbl_properties = null;
     SqlNode select = null;
@@ -77,11 +78,12 @@ SqlNode SqlCreateTable() :
     ]
     <LOCATION>
     location = StringLiteral()
+    [ <PARALLELISM> parallelism = UnsignedNumericLiteral() ]
     [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
     [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
         return new SqlCreateTable(pos, tblName, fieldList,
         input_format_class_name, output_format_class_name, location,
-        tbl_properties, select);
+        parallelism, tbl_properties, select);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index b780239..007daa7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -266,6 +266,9 @@ class StormSqlImpl extends StormSql {
       fields.add(new FieldInfo(col.name(), javaType, isPrimary));
     }
 
+    if (n.parallelism() != null) {
+      builder.parallelismHint(n.parallelism());
+    }
     Table table = builder.build();
     schema.add(n.tableName(), table);
     return fields;

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
new file mode 100644
index 0000000..c6b584d
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/calcite/ParallelStreamableTable.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.calcite;
+
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.schema.StreamableTable;
+
+/**
+ * Table that can be converted to a stream. This table also has its parallelism information.
+ *
+ * @see Delta
+ */
+public interface ParallelStreamableTable extends StreamableTable {
+
+    /**
+     * Returns parallelism hint of this table. Returns null if don't know.
+     */
+    Integer parallelismHint();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1b20aac..2e237c0 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
 import org.apache.storm.sql.parser.ColumnConstraint;
 
 import java.util.ArrayList;
@@ -75,6 +76,7 @@ public class CompilerUtil {
     private final ArrayList<FieldType> fields = new ArrayList<>();
     private final ArrayList<Object[]> rows = new ArrayList<>();
     private int primaryKey = -1;
+    private Integer parallelismHint;
     private SqlMonotonicity primaryKeyMonotonicity;
     private Statistic stats;
 
@@ -110,6 +112,11 @@ public class CompilerUtil {
       return this;
     }
 
+    public TableBuilderInfo parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
     public StreamableTable build() {
       final Statistic stat = buildStatistic();
       final Table tbl = new Table() {
@@ -135,7 +142,12 @@ public class CompilerUtil {
         }
       };
 
-      return new StreamableTable() {
+      return new ParallelStreamableTable() {
+        @Override
+        public Integer parallelismHint() {
+          return parallelismHint;
+        }
+
         @Override
         public Table stream() {
           return tbl;

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
index d810d3a..670eedb 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Properties;
 
 public class SqlCreateTable extends SqlCall {
+  private static final int DEFAULT_PARALLELISM = 1;
+
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
       "CREATE_TABLE", SqlKind.OTHER) {
     @Override
@@ -44,7 +46,7 @@ public class SqlCreateTable extends SqlCall {
         SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
       assert functionQualifier == null;
       return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
-                                o[2], o[3], o[4], o[5], o[6]);
+                                o[2], o[3], o[4], o[5], o[6], o[7]);
     }
 
     @Override
@@ -60,6 +62,9 @@ public class SqlCreateTable extends SqlCall {
             t.outputFormatClass);
       }
       u.keyword("LOCATION").node(t.location);
+      if (t.parallelism != null) {
+        u.keyword("PARALLELISM").node(t.parallelism);
+      }
       if (t.properties != null) {
         u.keyword("TBLPROPERTIES").node(t.properties);
       }
@@ -74,19 +79,21 @@ public class SqlCreateTable extends SqlCall {
   private final SqlNode inputFormatClass;
   private final SqlNode outputFormatClass;
   private final SqlNode location;
+  private final SqlNode parallelism;
   private final SqlNode properties;
   private final SqlNode query;
 
   public SqlCreateTable(
-      SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
-      SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
-      SqlNode properties, SqlNode query) {
+          SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
+          SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
+          SqlNode parallelism, SqlNode properties, SqlNode query) {
     super(pos);
     this.tblName = tblName;
     this.fieldList = fieldList;
     this.inputFormatClass = inputFormatClass;
     this.outputFormatClass = outputFormatClass;
     this.location = location;
+    this.parallelism = parallelism;
     this.properties = properties;
     this.query = query;
   }
@@ -116,6 +123,15 @@ public class SqlCreateTable extends SqlCall {
     return URI.create(getString(location));
   }
 
+  public Integer parallelism() {
+    String parallelismStr = getString(parallelism);
+    if (parallelismStr != null) {
+      return Integer.parseInt(parallelismStr);
+    } else {
+      return DEFAULT_PARALLELISM;
+    }
+  }
+
   public String inputFormatClass() {
     return getString(inputFormatClass);
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
index bc143ec..c563d73 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rel/TridentStreamScanRel.java
@@ -30,8 +30,11 @@ import org.apache.storm.trident.fluent.IAggregatableStream;
 import java.util.Map;
 
 public class TridentStreamScanRel extends StormStreamScanRelBase implements TridentRel {
-    public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable
table) {
+    private final int parallelismHint;
+
+    public TridentStreamScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable
table, int parallelismHint) {
         super(cluster, traitSet, table);
+        this.parallelismHint = parallelismHint;
     }
 
     @Override
@@ -45,7 +48,8 @@ public class TridentStreamScanRel extends StormStreamScanRelBase implements
Trid
         }
 
         String stageName = StormRelUtils.getStageName(this);
-        IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName,
sources.get(sourceName).getProducer());
+        IAggregatableStream finalStream = planCreator.getTopology().newStream(stageName,
sources.get(sourceName).getProducer())
+                .parallelismHint(parallelismHint);
         planCreator.addStream(finalStream);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d34c2ebf/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
index d863a66..abbd680 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/trident/rules/TridentScanRule.java
@@ -23,11 +23,13 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.schema.Table;
+import org.apache.storm.sql.calcite.ParallelStreamableTable;
 import org.apache.storm.sql.planner.trident.rel.TridentLogicalConvention;
 import org.apache.storm.sql.planner.trident.rel.TridentStreamScanRel;
 
 public class TridentScanRule extends ConverterRule {
   public static final TridentScanRule INSTANCE = new TridentScanRule();
+  public static final int DEFAULT_PARALLELISM_HINT = 1;
 
   private TridentScanRule() {
     super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE,
"TridentScanRule");
@@ -36,13 +38,19 @@ public class TridentScanRule extends ConverterRule {
   @Override
   public RelNode convert(RelNode rel) {
     final TableScan scan = (TableScan) rel;
-    final Table table = scan.getTable().unwrap(Table.class);
+    int parallelismHint = DEFAULT_PARALLELISM_HINT;
+
+    final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
+    if (parallelTable != null && parallelTable.parallelismHint() != null) {
+      parallelismHint = parallelTable.parallelismHint();
+    }
 
+    final Table table = scan.getTable().unwrap(Table.class);
     switch (table.getJdbcTableType()) {
       case STREAM:
         return new TridentStreamScanRel(scan.getCluster(),
             scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
-            scan.getTable());
+            scan.getTable(), parallelismHint);
       default:
         throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
     }


Mime
View raw message