Repository: incubator-carbondata
Updated Branches:
refs/heads/master 28190eb71 -> e8dcd4296
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
new file mode 100644
index 0000000..9a3f828
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parser
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.catalog.CatalogColumn
+import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext,
TablePropertyListContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel}
+import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Concrete parser for Spark SQL statements and carbon specific statements
+ */
+class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+
+ val astBuilder = new CarbonSqlAstBuilder(conf)
+
+ private val substitutor = new VariableSubstitution(conf)
+
+ protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T =
{
+ super.parse(substitutor.substitute(command))(toResult)
+ }
+
+ override def parsePlan(sqlText: String): LogicalPlan = {
+ try {
+ super.parsePlan(sqlText)
+ } catch {
+ case e: Throwable =>
+ astBuilder.parser.parse(sqlText)
+ }
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+
+ val parser = new CarbonSpark2SqlParser
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ val fileStorage = Option(ctx.createFileFormat) match {
+ case Some(value) => value.storageHandler().STRING().getSymbol.getText
+ case _ => ""
+ }
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
+ // TODO: implement temporary tables
+ if (temp) {
+ throw new ParseException(
+ "CREATE TEMPORARY TABLE is not supported yet. " +
+ "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
+ }
+ if (ctx.skewSpec != null) {
+ operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
+ }
+ if (ctx.bucketSpec != null) {
+ operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
+ }
+ val comment = Option(ctx.STRING).map(string)
+ val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
+ val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
+ val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
+ .getOrElse(Map.empty)
+
+ // Ensuring whether no duplicate name is used in table definition
+ val colNames = cols.map(_.name)
+ if (colNames.length != colNames.distinct.length) {
+ val duplicateColumns = colNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }
+ operationNotAllowed(s"Duplicated column names found in table definition of $name:
" +
+ duplicateColumns.mkString("[", ",", "]"), ctx)
+ }
+
+ // For Hive tables, partition columns must not be part of the schema
+ val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
+ if (badPartCols.nonEmpty) {
+ operationNotAllowed(s"Partition columns may not be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+ }
+
+ // Note: Hive requires partition columns to be distinct from the schema, so we need
+ // to include the partition columns here explicitly
+ val schema = cols ++ partitionCols
+
+ val fields = schema.map { col =>
+ val x = col.name + ' ' + col.dataType
+ val f: Field = parser.anyFieldDef(new parser.lexical.Scanner(x))
+ match {
+ case parser.Success(field, _) => field.asInstanceOf[Field]
+ case failureOrError => throw new MalformedCarbonCommandException(
+ s"Unsupported data type: $col.getType")
+ }
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = parser.getScaleAndPrecision(col.dataType)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ if(f.dataType.getOrElse("").startsWith("char")) {
+ f.dataType = Some("char")
+ }
+ f.rawSchema = x
+ f
+ }
+
+ // validate tblProperties
+ if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) {
+ throw new MalformedCarbonCommandException("Invalid table properties")
+ }
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+ name.database,
+ name.table,
+ fields,
+ Seq(),
+ properties.asJava.asScala)
+
+ CreateTable(tableModel)
+ } else {
+ super.visitCreateTable(ctx)
+ }
+ }
+
+ /**
+ * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
+ */
+ private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
= {
+ val props = visitTablePropertyList(ctx)
+ val badKeys = props.filter { case (_, v) => v == null }.keys
+ if (badKeys.nonEmpty) {
+ operationNotAllowed(
+ s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
+ }
+ props
+ }
+
+ private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = {
+ withOrigin(ctx) {
+ ctx.colType.asScala.map { col =>
+ CatalogColumn(
+ col.identifier.getText.toLowerCase,
+ // Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>"
we can't
+ // just convert the whole type string to lower case, otherwise the struct field
names
+ // will no longer be case sensitive. Instead, we rely on our parser to get the
proper
+ // case before passing it to Hive.
+ typedVisit[DataType](col.dataType).catalogString,
+ nullable = true,
+ Option(col.STRING).map(string))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c84882e..399b3e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
cleanFiles(spark, dbName, tableName, storePath)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 1e891fd..2db6e48 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -41,7 +41,7 @@ object Compaction {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index ae95bf6..951cd7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index d5a6861..dad9f59 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 1a02c8c..c953089 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
None
}
val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
val rows = showSegments(spark, dbName, tableName, limit)
System.out.println(showString(rows))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 8b10aa4..424d8fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -86,7 +86,7 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
- CarbonEnv.init(spark.sqlContext)
+ CarbonEnv.init(spark)
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/util/QueryTest.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/CarbonSessionTest.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 45dcb03..4310d04 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -54,6 +54,11 @@ class QueryTest extends PlanTest {
clean(metastoredb)
}
+ CarbonProperties.getInstance()
+ .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+ .addProperty("carbon.storelocation", storeLocation)
+
+ import org.apache.spark.sql.CarbonSession._
val spark = SparkSession
.builder()
.master("local")
@@ -62,17 +67,13 @@ class QueryTest extends PlanTest {
.config("spark.sql.warehouse.dir", warehouse)
.config("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastoredb;create=true")
- .getOrCreate()
-
- CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
- .addProperty("carbon.storelocation", storeLocation)
+ .getOrCreateCarbonSession()
spark.sparkContext.setLogLevel("WARN")
spark
}
- val sc = spark.sparkContext
+ val Dsc = spark.sparkContext
lazy val implicits = spark.implicits
|