carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [3/4] incubator-carbondata git commit: Initial commit
Date Tue, 27 Dec 2016 01:19:10 GMT
Initial commit

Added comments

Fixed style and testcases

Refactored code

Fixed issue

Rebased

Rebased

fixed comments

fixed style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bedc96d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bedc96d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bedc96d0

Branch: refs/heads/master
Commit: bedc96d059fe25ebec298220b92243e87c496a0c
Parents: 28190eb
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon Dec 19 18:57:11 2016 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Tue Dec 27 08:48:59 2016 +0800

----------------------------------------------------------------------
 .../carbondata/examples/CarbonExample.scala     | 173 ----
 .../examples/CarbonSessionExample.scala         | 144 +++
 .../examples/SparkSessionExample.scala          | 173 ++++
 .../catalyst/AbstractCarbonSparkSQLParser.scala | 137 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 968 +++++++++++++++++++
 .../execution/command/carbonTableSchema.scala   |   8 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 938 +-----------------
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../spark/sql/CarbonCatalystOperators.scala     |  36 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 133 +++
 .../org/apache/spark/sql/CarbonSource.scala     |  12 +-
 .../execution/CarbonLateDecodeStrategy.scala    |   6 +-
 .../sql/execution/command/DDLStrategy.scala     |  83 ++
 .../execution/command/carbonTableSchema.scala   | 319 +++++-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   9 +
 .../spark/sql/hive/CarbonSessionState.scala     |  38 +
 .../sql/parser/CarbonSpark2SqlParser.scala      | 125 +++
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 178 ++++
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   2 +-
 .../spark/carbondata/util/QueryTest.scala       |   0
 .../sql/common/util/CarbonSessionTest.scala     |   0
 .../spark/sql/common/util/QueryTest.scala       |  13 +-
 28 files changed, 2366 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
deleted file mode 100644
index 273de95..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{CleanFiles, ShowSegments}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-
-object CarbonExample {
-
-  def main(args: Array[String]): Unit = {
-    val rootPath = new File(this.getClass.getResource("/").getPath
-        + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
-    // clean data folder
-    if (true) {
-      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
-      clean(storeLocation)
-      clean(warehouse)
-      clean(metastoredb)
-    }
-
-    val spark = SparkSession
-        .builder()
-        .master("local")
-        .appName("CarbonExample")
-        .enableHiveSupport()
-        .config("spark.sql.warehouse.dir", warehouse)
-        .config("javax.jdo.option.ConnectionURL",
-          s"jdbc:derby:;databaseName=$metastoredb;create=true")
-        .getOrCreate()
-
-    CarbonProperties.getInstance()
-      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
-      .addProperty("carbon.storelocation", storeLocation)
-
-    spark.sparkContext.setLogLevel("WARN")
-
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    // Create table
-    spark.sql(
-      s"""
-         | CREATE TABLE carbon_table(
-         |    shortField short,
-         |    intField int,
-         |    bigintField long,
-         |    doubleField double,
-         |    stringField string,
-         |    timestampField timestamp,
-         |    decimalField decimal(18,2),
-         |    dateField date,
-         |    charField char(5)
-         | )
-         | USING org.apache.spark.sql.CarbonSource
-         | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
-         |   'dbName'='default', 'tableName'='carbon_table')
-       """.stripMargin)
-
-    // val prop = s"$rootPath/conf/dataload.properties.template"
-    // val tableName = "carbon_table"
-    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-    // TableLoader.main(Array[String](prop, tableName, path))
-
-    spark.sql(
-      s"""
-         | CREATE TABLE csv_table
-         | (  shortField short,
-         |    intField int,
-         |    bigintField long,
-         |    doubleField double,
-         |    stringField string,
-         |    timestampField string,
-         |    decimalField decimal(18,2),
-         |    dateField string,
-         |    charField char(5))
-         |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-       """.stripMargin)
-
-    spark.sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$path'
-         | INTO TABLE csv_table
-       """.stripMargin)
-
-    spark.sql("""
-             SELECT *
-             FROM csv_table
-              """).show
-
-    spark.sql(
-      s"""
-         | INSERT INTO TABLE carbon_table
-         | SELECT shortField, intField, bigintField, doubleField, stringField,
-         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
-         | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
-         | FROM csv_table
-       """.stripMargin)
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table
-             where stringfield = 'spark' and decimalField > 40
-              """).show
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table where length(stringField) = 5
-              """).show
-
-    spark.sql("""
-             SELECT *
-             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
-              """).show
-
-    spark.sql("""
-             select count(stringField) from carbon_table
-              """.stripMargin).show
-
-    spark.sql("""
-           SELECT sum(intField), stringField
-           FROM carbon_table
-           GROUP BY stringField
-           """).show
-
-    spark.sql(
-      """
-        |select t1.*, t2.*
-        |from carbon_table t1, carbon_table t2
-        |where t1.stringField = t2.stringField
-      """.stripMargin).show
-
-    spark.sql(
-      """
-        |with t1 as (
-        |select * from carbon_table
-        |union all
-        |select * from carbon_table
-        |)
-        |select t1.*, t2.*
-        |from t1, carbon_table t2
-        |where t1.stringField = t2.stringField
-      """.stripMargin).show
-
-    // Drop table
-    spark.sql("DROP TABLE IF EXISTS carbon_table")
-    spark.sql("DROP TABLE IF EXISTS csv_table")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
new file mode 100644
index 0000000..4923e5b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonSessionExample {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master("local")
+      .appName("CarbonExample")
+      .enableHiveSupport()
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("javax.jdo.option.ConnectionURL",
+    s"jdbc:derby:;databaseName=$metastoredb;create=true")
+      .getOrCreateCarbonSession()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField char(5)
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon_table
+         | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField')
+       """.stripMargin)
+    // scalastyle:on
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table
+             where stringfield = 'spark' and decimalField > 40
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where length(stringField) = 5
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+              """).show
+
+    spark.sql("""
+             select count(stringField) from carbon_table
+              """.stripMargin).show
+
+    spark.sql("""
+           SELECT sum(intField), stringField
+           FROM carbon_table
+           GROUP BY stringField
+              """).show
+
+    spark.sql(
+      """
+        |select t1.*, t2.*
+        |from carbon_table t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    spark.sql(
+      """
+        |with t1 as (
+        |select * from carbon_table
+        |union all
+        |select * from carbon_table
+        |)
+        |select t1.*, t2.*
+        |from t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
new file mode 100644
index 0000000..2affbe2
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{CleanFiles, ShowSegments}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object SparkSessionExample {
+
+  def main(args: Array[String]): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
+
+    // clean data folder
+    if (true) {
+      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
+      clean(storeLocation)
+      clean(warehouse)
+      clean(metastoredb)
+    }
+
+    val spark = SparkSession
+        .builder()
+        .master("local")
+        .appName("CarbonExample")
+        .enableHiveSupport()
+        .config("spark.sql.warehouse.dir", warehouse)
+        .config("javax.jdo.option.ConnectionURL",
+          s"jdbc:derby:;databaseName=$metastoredb;create=true")
+        .getOrCreate()
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
+      .addProperty("carbon.storelocation", storeLocation)
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2),
+         |    dateField date,
+         |    charField char(5)
+         | )
+         | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS('DICTIONARY_INCLUDE'='dateField, charField',
+         |   'dbName'='default', 'tableName'='carbon_table')
+       """.stripMargin)
+
+    // val prop = s"$rootPath/conf/dataload.properties.template"
+    // val tableName = "carbon_table"
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+    // TableLoader.main(Array[String](prop, tableName, path))
+
+    spark.sql(
+      s"""
+         | CREATE TABLE csv_table
+         | (  shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    timestampField string,
+         |    decimalField decimal(18,2),
+         |    dateField string,
+         |    charField char(5))
+         |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE csv_table
+       """.stripMargin)
+
+    spark.sql("""
+             SELECT *
+             FROM csv_table
+              """).show
+
+    spark.sql(
+      s"""
+         | INSERT INTO TABLE carbon_table
+         | SELECT shortField, intField, bigintField, doubleField, stringField,
+         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField,
+         | cast(to_date(from_unixtime(unix_timestamp(dateField,'yyyy/M/dd'))) as date), charField
+         | FROM csv_table
+       """.stripMargin)
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table
+             where stringfield = 'spark' and decimalField > 40
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where length(stringField) = 5
+              """).show
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table where date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+              """).show
+
+    spark.sql("""
+             select count(stringField) from carbon_table
+              """.stripMargin).show
+
+    spark.sql("""
+           SELECT sum(intField), stringField
+           FROM carbon_table
+           GROUP BY stringField
+           """).show
+
+    spark.sql(
+      """
+        |select t1.*, t2.*
+        |from carbon_table t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    spark.sql(
+      """
+        |with t1 as (
+        |select * from carbon_table
+        |union all
+        |select * from carbon_table
+        |)
+        |select t1.*, t2.*
+        |from t1, carbon_table t2
+        |where t1.stringField = t2.stringField
+      """.stripMargin).show
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+    spark.sql("DROP TABLE IF EXISTS csv_table")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
new file mode 100644
index 0000000..fba7976
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/AbstractCarbonSparkSQLParser.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.lexical.StdLexical
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.PackratParsers
+import scala.util.parsing.input.CharArrayReader.EofCh
+
+import org.apache.spark.sql.catalyst.plans.logical._
+
+private[sql] abstract class AbstractCarbonSparkSQLParser
+  extends StandardTokenParsers with PackratParsers {
+
+  def parse(input: String): LogicalPlan = synchronized {
+    // Initialize the Keywords.
+    initLexical
+    phrase(start)(new lexical.Scanner(input)) match {
+      case Success(plan, _) => plan
+      case failureOrError => sys.error(failureOrError.toString)
+    }
+  }
+  /* One time initialization of lexical.This avoid reinitialization of  lexical in parse method */
+  protected lazy val initLexical: Unit = lexical.initialize(reservedWords)
+
+  protected case class Keyword(str: String) {
+    def normalize: String = lexical.normalizeKeyword(str)
+    def parser: Parser[String] = normalize
+  }
+
+  protected implicit def asParser(k: Keyword): Parser[String] = k.parser
+
+  // By default, use Reflection to find the reserved words defined in the sub class.
+  // NOTICE, Since the Keyword properties defined by sub class, we couldn't call this
+  // method during the parent class instantiation, because the sub class instance
+  // isn't created yet.
+  protected lazy val reservedWords: Seq[String] =
+    this
+      .getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword].normalize)
+
+  // Set the keywords as empty by default, will change that later.
+  override val lexical = new SqlLexical
+
+  protected def start: Parser[LogicalPlan]
+
+  // Returns the whole input string
+  protected lazy val wholeInput: Parser[String] = new Parser[String] {
+    def apply(in: Input): ParseResult[String] =
+      Success(in.source.toString, in.drop(in.source.length()))
+  }
+
+  // Returns the rest of the input string that are not parsed yet
+  protected lazy val restInput: Parser[String] = new Parser[String] {
+    def apply(in: Input): ParseResult[String] =
+      Success(
+        in.source.subSequence(in.offset, in.source.length()).toString,
+        in.drop(in.source.length()))
+  }
+}
+
+class SqlLexical extends StdLexical {
+  case class FloatLit(chars: String) extends Token {
+    override def toString: String = chars
+  }
+
+  /* This is a work around to support the lazy setting */
+  def initialize(keywords: Seq[String]): Unit = {
+    reserved.clear()
+    reserved ++= keywords
+  }
+
+  /* Normal the keyword string */
+  def normalizeKeyword(str: String): String = str.toLowerCase
+
+  delimiters += (
+    "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+    ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
+    )
+
+  protected override def processIdent(name: String) = {
+    val token = normalizeKeyword(name)
+    if (reserved contains token) Keyword(token) else Identifier(name)
+  }
+
+  override lazy val token: Parser[Token] =
+    ( identChar ~ (identChar | digit).* ^^
+      { case first ~ rest => processIdent((first :: rest).mkString) }
+      | digit.* ~ identChar ~ (identChar | digit).* ^^
+        { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) }
+      | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
+      case i ~ None => NumericLit(i.mkString)
+      case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+    }
+      | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
+        { case chars => StringLit(chars mkString "") }
+      | '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
+        { case chars => StringLit(chars mkString "") }
+      | '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
+        { case chars => Identifier(chars mkString "") }
+      | EofCh ^^^ EOF
+      | '\'' ~> failure("unclosed string literal")
+      | '"' ~> failure("unclosed string literal")
+      | delim
+      | failure("illegal character")
+      )
+
+  override def identChar: Parser[Elem] = letter | elem('_')
+
+  override def whitespace: Parser[Any] =
+    ( whitespaceChar
+      | '/' ~ '*' ~ comment
+      | '/' ~ '/' ~ chrExcept(EofCh, '\n').*
+      | '#' ~ chrExcept(EofCh, '\n').*
+      | '-' ~ '-' ~ chrExcept(EofCh, '\n').*
+      | '/' ~ '*' ~ failure("unclosed comment")
+      ).*
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
new file mode 100644
index 0000000..a5088df
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -0,0 +1,968 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.util.regex.{Matcher, Pattern}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{LinkedHashSet, Map}
+import scala.language.implicitConversions
+import scala.util.matching.Regex
+
+import org.apache.hadoop.hive.ql.lib.Node
+import org.apache.hadoop.hive.ql.parse._
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * TODO remove the duplicate code and add the common methods to common class.
+ * Parser for All Carbon DDL cases
+ */
+abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  protected val AGGREGATE = carbonKeyWord("AGGREGATE")
+  protected val AS = carbonKeyWord("AS")
+  protected val AGGREGATION = carbonKeyWord("AGGREGATION")
+  protected val ALL = carbonKeyWord("ALL")
+  protected val HIGH_CARDINALITY_DIMS = carbonKeyWord("NO_DICTIONARY")
+  protected val BEFORE = carbonKeyWord("BEFORE")
+  protected val BY = carbonKeyWord("BY")
+  protected val CARDINALITY = carbonKeyWord("CARDINALITY")
+  protected val CASCADE = carbonKeyWord("CASCADE")
+  protected val CLASS = carbonKeyWord("CLASS")
+  protected val CLEAN = carbonKeyWord("CLEAN")
+  protected val COLS = carbonKeyWord("COLS")
+  protected val COLUMNS = carbonKeyWord("COLUMNS")
+  protected val COMPACT = carbonKeyWord("COMPACT")
+  protected val CREATE = carbonKeyWord("CREATE")
+  protected val CUBE = carbonKeyWord("CUBE")
+  protected val CUBES = carbonKeyWord("CUBES")
+  protected val DATA = carbonKeyWord("DATA")
+  protected val DATABASE = carbonKeyWord("DATABASE")
+  protected val DATABASES = carbonKeyWord("DATABASES")
+  protected val DELETE = carbonKeyWord("DELETE")
+  protected val DELIMITER = carbonKeyWord("DELIMITER")
+  protected val DESCRIBE = carbonKeyWord("DESCRIBE")
+  protected val DESC = carbonKeyWord("DESC")
+  protected val DETAIL = carbonKeyWord("DETAIL")
+  protected val DIMENSIONS = carbonKeyWord("DIMENSIONS")
+  protected val DIMFOLDERPATH = carbonKeyWord("DIMFOLDERPATH")
+  protected val DROP = carbonKeyWord("DROP")
+  protected val ESCAPECHAR = carbonKeyWord("ESCAPECHAR")
+  protected val EXCLUDE = carbonKeyWord("EXCLUDE")
+  protected val EXPLAIN = carbonKeyWord("EXPLAIN")
+  protected val EXTENDED = carbonKeyWord("EXTENDED")
+  protected val FORMATTED = carbonKeyWord("FORMATTED")
+  protected val FACT = carbonKeyWord("FACT")
+  protected val FIELDS = carbonKeyWord("FIELDS")
+  protected val FILEHEADER = carbonKeyWord("FILEHEADER")
+  protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
+  protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
+  protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
+  protected val FILES = carbonKeyWord("FILES")
+  protected val FROM = carbonKeyWord("FROM")
+  protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
+  protected val IN = carbonKeyWord("IN")
+  protected val INCLUDE = carbonKeyWord("INCLUDE")
+  protected val INPATH = carbonKeyWord("INPATH")
+  protected val INTO = carbonKeyWord("INTO")
+  protected val LEVELS = carbonKeyWord("LEVELS")
+  protected val LIKE = carbonKeyWord("LIKE")
+  protected val LOAD = carbonKeyWord("LOAD")
+  protected val LOCAL = carbonKeyWord("LOCAL")
+  protected val MAPPED = carbonKeyWord("MAPPED")
+  protected val MEASURES = carbonKeyWord("MEASURES")
+  protected val MULTILINE = carbonKeyWord("MULTILINE")
+  protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
+  protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
+  protected val OPTIONS = carbonKeyWord("OPTIONS")
+  protected val OUTPATH = carbonKeyWord("OUTPATH")
+  protected val OVERWRITE = carbonKeyWord("OVERWRITE")
+  protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
+  protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
+  protected val PARTITIONER = carbonKeyWord("PARTITIONER")
+  protected val QUOTECHAR = carbonKeyWord("QUOTECHAR")
+  protected val RELATION = carbonKeyWord("RELATION")
+  protected val SCHEMA = carbonKeyWord("SCHEMA")
+  protected val SCHEMAS = carbonKeyWord("SCHEMAS")
+  protected val SHOW = carbonKeyWord("SHOW")
+  protected val TABLES = carbonKeyWord("TABLES")
+  protected val TABLE = carbonKeyWord("TABLE")
+  protected val TERMINATED = carbonKeyWord("TERMINATED")
+  protected val TYPE = carbonKeyWord("TYPE")
+  protected val USE = carbonKeyWord("USE")
+  protected val WHERE = carbonKeyWord("WHERE")
+  protected val WITH = carbonKeyWord("WITH")
+  protected val AGGREGATETABLE = carbonKeyWord("AGGREGATETABLE")
+  protected val ABS = carbonKeyWord("abs")
+
+  protected val FOR = carbonKeyWord("FOR")
+  protected val SCRIPTS = carbonKeyWord("SCRIPTS")
+  protected val USING = carbonKeyWord("USING")
+  protected val LIMIT = carbonKeyWord("LIMIT")
+  protected val DEFAULTS = carbonKeyWord("DEFAULTS")
+  protected val ALTER = carbonKeyWord("ALTER")
+  protected val ADD = carbonKeyWord("ADD")
+
+  protected val IF = carbonKeyWord("IF")
+  protected val NOT = carbonKeyWord("NOT")
+  protected val EXISTS = carbonKeyWord("EXISTS")
+  protected val DIMENSION = carbonKeyWord("DIMENSION")
+  protected val STARTTIME = carbonKeyWord("STARTTIME")
+  protected val SEGMENTS = carbonKeyWord("SEGMENTS")
+  protected val SEGMENT = carbonKeyWord("SEGMENT")
+
+  protected val STRING = carbonKeyWord("STRING")
+  protected val INTEGER = carbonKeyWord("INTEGER")
+  protected val TIMESTAMP = carbonKeyWord("TIMESTAMP")
+  protected val DATE = carbonKeyWord("DATE")
+  protected val CHAR = carbonKeyWord("CHAR")
+  protected val NUMERIC = carbonKeyWord("NUMERIC")
+  protected val DECIMAL = carbonKeyWord("DECIMAL")
+  protected val DOUBLE = carbonKeyWord("DOUBLE")
+  protected val SHORT = carbonKeyWord("SMALLINT")
+  protected val INT = carbonKeyWord("INT")
+  protected val BIGINT = carbonKeyWord("BIGINT")
+  protected val ARRAY = carbonKeyWord("ARRAY")
+  protected val STRUCT = carbonKeyWord("STRUCT")
+
+  protected val doubleQuotedString = "\"([^\"]+)\"".r
+  protected val singleQuotedString = "'([^']+)'".r
+
+  protected val newReservedWords =
+    this.getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+  override val lexical = {
+    val sqllex = new SqlLexical()
+    sqllex.initialize(newReservedWords)
+    sqllex
+
+  }
+
+  import lexical.Identifier
+
+  implicit def regexToParser(regex: Regex): Parser[String] = {
+    acceptMatch(
+    s"identifier matching regex ${ regex }",
+    { case Identifier(str) if regex.unapplySeq(str).isDefined => str }
+    )
+  }
+
+  /**
+   * This will convert key word to regular expression.
+   *
+   * @param keys
+   * @return
+   */
+  private def carbonKeyWord(keys: String) = {
+    ("(?i)" + keys).r
+  }
+
+  protected val escapedIdentifier = "`([^`]+)`".r
+
+  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
+    var complexDimensions: Seq[Field] = Seq()
+    var dimensions: Seq[Field] = Seq()
+    dims.foreach { dimension =>
+      dimension.dataType.getOrElse("NIL") match {
+        case "Array" => complexDimensions = complexDimensions :+ dimension
+        case "Struct" => complexDimensions = complexDimensions :+ dimension
+        case _ => dimensions = dimensions :+ dimension
+      }
+    }
+    dimensions ++ complexDimensions
+  }
+
+
+
+  def getScaleAndPrecision(dataType: String): (Int, Int) = {
+    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+    m.find()
+    val matchedString: String = m.group(1)
+    val scaleAndPrecision = matchedString.split(",")
+    (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+  }
+
+  /**
+   * This will prepate the Model from the Tree details.
+   *
+   * @param ifNotExistPresent
+   * @param dbName
+   * @param tableName
+   * @param fields
+   * @param partitionCols
+   * @param tableProperties
+   * @return
+   */
+  def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
+      , tableName: String, fields: Seq[Field],
+      partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]): TableModel
+  = {
+
+    fields.zipWithIndex.foreach { x =>
+      x._1.schemaOrdinal = x._2
+    }
+    val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
+      fields, tableProperties)
+    if (dims.isEmpty) {
+      throw new MalformedCarbonCommandException(s"Table ${
+        dbName.getOrElse(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+      }.$tableName"
+                                                +
+                                                " can not be created without key columns. Please " +
+                                                "use DICTIONARY_INCLUDE or " +
+                                                "DICTIONARY_EXCLUDE to set at least one key " +
+                                                "column " +
+                                                "if all specified columns are numeric types")
+    }
+    val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)
+
+    // column properties
+    val colProps = extractColumnProperties(fields, tableProperties)
+    // get column groups configuration from table properties.
+    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
+      noDictionaryDims, msrs, dims)
+
+    // get no inverted index columns from table properties.
+    val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
+
+    // validate the tableBlockSize from table properties
+    CommonUtil.validateTableBlockSize(tableProperties)
+
+    TableModel(
+      ifNotExistPresent,
+      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
+      dbName,
+      tableName,
+      tableProperties,
+      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+      msrs.map(f => normalizeType(f)),
+      Option(noDictionaryDims),
+      Option(noInvertedIdxCols),
+      groupCols,
+      Some(colProps))
+  }
+
+  /**
+   * Extract the column groups configuration from table properties.
+   * Based on this Row groups of fields will be determined.
+   *
+   * @param tableProperties
+   * @return
+   */
+  protected def updateColumnGroupsInField(tableProperties: Map[String, String],
+      noDictionaryDims: Seq[String],
+      msrs: Seq[Field],
+      dims: Seq[Field]): Seq[String] = {
+    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
+
+      var splittedColGrps: Seq[String] = Seq[String]()
+      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+
+      // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
+      // here first splitting the value by () . so that the above will be splitted into 2 strings.
+      // [col1,col2] [col3,col4]
+      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
+      while (m.find()) {
+        val oneGroup: String = m.group(1)
+        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims)
+        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
+        splittedColGrps :+= arrangedColGrp
+      }
+      // This will  be furthur handled.
+      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
+    } else {
+      null
+    }
+  }
+
+  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
+    // if columns in column group is not in schema order than arrange it in schema order
+    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
+    colGroup.split(',').map(_.trim).foreach { x =>
+      dims.zipWithIndex.foreach { dim =>
+        if (dim._1.column.equalsIgnoreCase(x)) {
+          colGrpFieldIndx :+= dim._2
+        }
+      }
+    }
+    // sort it
+    colGrpFieldIndx = colGrpFieldIndx.sorted
+    // check if columns in column group is in schema order
+    if (!checkIfInSequence(colGrpFieldIndx)) {
+      throw new MalformedCarbonCommandException("Invalid column group:" + colGroup)
+    }
+    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
+      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
+        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
+          throw new MalformedCarbonCommandException(
+            "Invalid column group,column in group should be contiguous as per schema.")
+        }
+      }
+      true
+    }
+    val colGrpNames: StringBuilder = StringBuilder.newBuilder
+    for (i <- colGrpFieldIndx.indices) {
+      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
+      if (i < (colGrpFieldIndx.length - 1)) {
+        colGrpNames.append(",")
+      }
+    }
+    colGrpNames.toString()
+  }
+
+  /**
+   * For getting the partitioner Object
+   *
+   * @param partitionCols
+   * @param tableProperties
+   * @return
+   */
+  protected def getPartitionerObject(partitionCols: Seq[PartitionerField],
+      tableProperties: Map[String, String]):
+  Option[Partitioner] = {
+
+    // by default setting partition class empty.
+    // later in table schema it is setting to default value.
+    var partitionClass: String = ""
+    var partitionCount: Int = 1
+    var partitionColNames: Array[String] = Array[String]()
+    if (tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).isDefined) {
+      partitionClass = tableProperties.get(CarbonCommonConstants.PARTITIONCLASS).get
+    }
+
+    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
+      try {
+        partitionCount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
+      } catch {
+        case e: Exception => // no need to do anything.
+      }
+    }
+
+    partitionCols.foreach(col =>
+      partitionColNames :+= col.partitionColumn
+    )
+
+    // this means user has given partition cols list
+    if (!partitionColNames.isEmpty) {
+      return Option(Partitioner(partitionClass, partitionColNames, partitionCount, null))
+    }
+    // if partition cols are not given then no need to do partition.
+    None
+  }
+
+  protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String, String]):
+  java.util.Map[String, java.util.List[ColumnProperty]] = {
+    val colPropMap = new java.util.HashMap[String, java.util.List[ColumnProperty]]()
+    fields.foreach { field =>
+      if (field.children.isDefined && field.children.get != null) {
+        fillAllChildrenColumnProperty(field.column, field.children, tableProperties, colPropMap)
+      } else {
+        fillColumnProperty(None, field.column, tableProperties, colPropMap)
+      }
+    }
+    colPropMap
+  }
+
+  protected def fillAllChildrenColumnProperty(parent: String, fieldChildren: Option[List[Field]],
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    fieldChildren.foreach(fields => {
+      fields.foreach(field => {
+        fillColumnProperty(Some(parent), field.column, tableProperties, colPropMap)
+      }
+      )
+    }
+    )
+  }
+
+  protected def fillColumnProperty(parentColumnName: Option[String],
+      columnName: String,
+      tableProperties: Map[String, String],
+      colPropMap: java.util.HashMap[String, java.util.List[ColumnProperty]]) {
+    val (tblPropKey, colProKey) = getKey(parentColumnName, columnName)
+    val colProps = CommonUtil.getColumnProperties(tblPropKey, tableProperties)
+    if (colProps.isDefined) {
+      colPropMap.put(colProKey, colProps.get)
+    }
+  }
+
+  def getKey(parentColumnName: Option[String],
+      columnName: String): (String, String) = {
+    if (parentColumnName.isDefined) {
+      if (columnName == "val") {
+        (parentColumnName.get, parentColumnName.get + "." + columnName)
+      } else {
+        (parentColumnName.get + "." + columnName, parentColumnName.get + "." + columnName)
+      }
+    } else {
+      (columnName, columnName)
+    }
+  }
+
+  /**
+   * This will extract the no inverted columns fields.
+   * By default all dimensions use inverted index.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractNoInvertedIndexColumns(fields: Seq[Field],
+      tableProperties: Map[String, String]): Seq[String] = {
+    // check whether the column name is in fields
+    var noInvertedIdxColsProps: Array[String] = Array[String]()
+    var noInvertedIdxCols: Seq[String] = Seq[String]()
+
+    if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
+      noInvertedIdxColsProps =
+        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
+      noInvertedIdxColsProps.map { noInvertedIdxColProp =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
+            val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
+                           " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          }
+        }
+    }
+    // check duplicate columns and only 1 col left
+    val distinctCols = noInvertedIdxColsProps.toSet
+    // extract the no inverted index columns
+    fields.foreach(field => {
+      if (distinctCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        noInvertedIdxCols :+= field.column
+      }
+    }
+    )
+    noInvertedIdxCols
+  }
+
+  /**
+   * This will extract the Dimensions and NoDictionary Dimensions fields.
+   * By default all string cols are dimensions.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field],
+      tableProperties: Map[String, String]):
+  (Seq[Field], Seq[String]) = {
+    var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
+    var dictExcludeCols: Array[String] = Array[String]()
+    var noDictionaryDims: Seq[String] = Seq[String]()
+    var dictIncludeCols: Seq[String] = Seq[String]()
+
+    // All excluded cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+      dictExcludeCols
+        .map { dictExcludeCol =>
+          if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
+            val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
+                           " does not exist in table. Please check create table statement."
+            throw new MalformedCarbonCommandException(errormsg)
+          } else {
+            val dataType = fields.find(x =>
+              x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
+            if (isComplexDimDictionaryExclude(dataType)) {
+              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
+                             dictExcludeCol
+              throw new MalformedCarbonCommandException(errormsg)
+            } else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
+              val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
+                             " data type column: " + dictExcludeCol
+              throw new MalformedCarbonCommandException(errorMsg)
+            }
+          }
+        }
+    }
+    // All included cols should be there in create table cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludeCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
+      dictIncludeCols.map { distIncludeCol =>
+        if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
+          val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
+                         " does not exist in table. Please check create table statement."
+          throw new MalformedCarbonCommandException(errormsg)
+        }
+      }
+    }
+
+    // include cols should contain exclude cols
+    dictExcludeCols.foreach { dicExcludeCol =>
+      if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) {
+        val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol +
+                       " with DICTIONARY_INCLUDE. Please check create table statement."
+        throw new MalformedCarbonCommandException(errormsg)
+      }
+    }
+
+    // by default consider all String cols as dims and if any dictionary exclude is present then
+    // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims
+    fields.foreach(field => {
+
+      if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) {
+        val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase())
+        if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE ) {
+          noDictionaryDims :+= field.column
+        }
+        dimFields += field
+      } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
+        dimFields += (field)
+      } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
+        dimFields += (field)
+      }
+    }
+    )
+
+    (dimFields.toSeq, noDictionaryDims)
+  }
+
+  /**
+   * It fills non string dimensions in dimFields
+   */
+  def fillNonStringDimension(dictIncludeCols: Seq[String],
+      field: Field, dimFields: LinkedHashSet[Field]) {
+    var dictInclude = false
+    if (dictIncludeCols.nonEmpty) {
+      dictIncludeCols.foreach(dictIncludeCol =>
+        if (field.column.equalsIgnoreCase(dictIncludeCol)) {
+          dictInclude = true
+        })
+    }
+    if (dictInclude) {
+      dimFields += field
+    }
+  }
+
+  /**
+   * detect dimention data type
+   *
+   * @param dimensionDatatype
+   */
+  def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
+    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype))
+  }
+
+  /**
+   * detects whether complex dimension is part of dictionary_exclude
+   */
+  def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
+    val dimensionType = Array("array", "struct")
+    dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
+  }
+
+  /**
+   * detects whether double or decimal column is part of dictionary_exclude
+   */
+  def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
+    val dataTypes = Array("string", "timestamp")
+    dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
+  }
+
+  /**
+   * Extract the Measure Cols fields. By default all non string cols will be measures.
+   *
+   * @param fields
+   * @param tableProperties
+   * @return
+   */
+  protected def extractMsrColsFromFields(fields: Seq[Field],
+      tableProperties: Map[String, String]): Seq[Field] = {
+    var msrFields: Seq[Field] = Seq[Field]()
+    var dictIncludedCols: Array[String] = Array[String]()
+    var dictExcludedCols: Array[String] = Array[String]()
+
+    // get all included cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
+      dictIncludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+    }
+
+    // get all excluded cols
+    if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
+      dictExcludedCols =
+        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+    }
+
+    // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
+    fields.foreach(field => {
+      if (!isDetectAsDimentionDatatype(field.dataType.get)) {
+        if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) &&
+            !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) {
+          msrFields :+= field
+        }
+      }
+    })
+
+    msrFields
+  }
+
+  /**
+   * Extract the DbName and table name.
+   *
+   * @param tableNameParts
+   * @return
+   */
+  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
+    val (db, tableName) =
+      tableNameParts.getChildren.asScala.map {
+        case Token(part, Nil) => cleanIdentifier(part)
+      } match {
+        case Seq(tableOnly) => (None, tableOnly)
+        case Seq(databaseName, table) => (Some(databaseName), table)
+      }
+
+    (db, tableName)
+  }
+
+  protected def cleanIdentifier(ident: String): String = {
+    ident match {
+      case escapedIdentifier(i) => i
+      case plainIdent => plainIdent
+    }
+  }
+
+  protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
+    var remainingNodes = nodeList
+    val clauses = clauseNames.map { clauseName =>
+      val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
+      remainingNodes = nonMatches ++ (if (matches.nonEmpty) {
+        matches.tail
+      } else {
+        Nil
+      })
+      matches.headOption
+    }
+
+    if (remainingNodes.nonEmpty) {
+      sys.error(
+        s"""Unhandled clauses:
+            |You are likely trying to use an unsupported carbon feature."""".stripMargin)
+    }
+    clauses
+  }
+
+  object Token {
+    /** @return matches of the form (tokenName, children). */
+    def unapply(t: Any): Option[(String, Seq[ASTNode])] = {
+      t match {
+        case t: ASTNode =>
+          CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
+          Some((t.getText,
+            Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+        case _ => None
+      }
+    }
+  }
+
+  /**
+   * Extract the table properties token
+   *
+   * @param node
+   * @return
+   */
+  protected def getProperties(node: Node): Seq[(String, String)] = {
+    node match {
+      case Token("TOK_TABLEPROPLIST", list) =>
+        list.map {
+          case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+            (unquoteString(key) -> unquoteString(value))
+        }
+    }
+  }
+
+  protected def unquoteString(str: String) = {
+    str match {
+      case singleQuotedString(s) => s.toLowerCase()
+      case doubleQuotedString(s) => s.toLowerCase()
+      case other => other
+    }
+  }
+
+  protected def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
+
+    // validate with all supported options
+    val options = optionList.get.groupBy(x => x._1)
+    val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
+      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
+      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
+    )
+    var isSupported = true
+    val invalidOptions = StringBuilder.newBuilder
+    options.foreach(value => {
+      if (!supportedOptions.exists(x => x.equalsIgnoreCase(value._1))) {
+        isSupported = false
+        invalidOptions.append(value._1)
+      }
+
+    }
+    )
+    if (!isSupported) {
+      val errorMessage = "Error: Invalid option(s): " + invalidOptions.toString()
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
+    //  COLUMNDICT and ALL_DICTIONARY_PATH can not be used together.
+    if (options.exists(_._1.equalsIgnoreCase("COLUMNDICT")) &&
+        options.exists(_._1.equalsIgnoreCase("ALL_DICTIONARY_PATH"))) {
+      val errorMessage = "Error: COLUMNDICT and ALL_DICTIONARY_PATH can not be used together" +
+                         " in options"
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+
+    if (options.exists(_._1.equalsIgnoreCase("MAXCOLUMNS"))) {
+      val maxColumns: String = options.get("maxcolumns").get(0)._2
+      try {
+        maxColumns.toInt
+      } catch {
+        case ex: NumberFormatException =>
+          throw new MalformedCarbonCommandException(
+            "option MAXCOLUMNS can only contain integer values")
+      }
+    }
+
+    // check for duplicate options
+    val duplicateOptions = options filter {
+      case (_, optionlist) => optionlist.size > 1
+    }
+    val duplicates = StringBuilder.newBuilder
+    if (duplicateOptions.nonEmpty) {
+      duplicateOptions.foreach(x => {
+        duplicates.append(x._1)
+      }
+      )
+      val errorMessage = "Error: Duplicate option(s): " + duplicates.toString()
+      throw new MalformedCarbonCommandException(errorMessage)
+    }
+  }
+
+  protected lazy val dbTableIdentifier: Parser[Seq[String]] =
+    (ident <~ ".").? ~ (ident) ^^ {
+      case databaseName ~ tableName =>
+        if (databaseName.isDefined) {
+          Seq(databaseName.get, tableName)
+        } else {
+          Seq(tableName)
+        }
+    }
+
+  protected lazy val loadOptions: Parser[(String, String)] =
+    (stringLit <~ "=") ~ stringLit ^^ {
+      case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
+      case _ => ("", "")
+    }
+
+
+  protected lazy val dimCol: Parser[Field] = anyFieldDef
+
+  protected lazy val primitiveTypes =
+    STRING ^^^ "string" | INTEGER ^^^ "integer" |
+    TIMESTAMP ^^^ "timestamp" | NUMERIC ^^^ "numeric" |
+    BIGINT ^^^ "bigint" | SHORT ^^^ "smallint" |
+    INT ^^^ "int" | DOUBLE ^^^ "double" | decimalType | DATE ^^^ "date" | charType
+
+  /**
+   * Matching the decimal(10,0) data type and returning the same.
+   */
+  private lazy val charType =
+    CHAR ~ ("(" ~>numericLit <~ ")").? ^^ {
+      case char ~ digit =>
+        s"$char($digit)"
+    }
+
+  /**
+   * Matching the decimal(10,0) data type and returning the same.
+   */
+  private lazy val decimalType =
+  DECIMAL ~ ("(" ~> numericLit <~ ",") ~ (numericLit <~ ")") ^^ {
+    case decimal ~ precision ~ scale =>
+      s"$decimal($precision, $scale)"
+  }
+
+  protected lazy val nestedType: Parser[Field] = structFieldType | arrayFieldType |
+                                                 primitiveFieldType
+
+  lazy val anyFieldDef: Parser[Field] =
+    (ident | stringLit) ~ ((":").? ~> nestedType) ~ (IN ~> (ident | stringLit)).? ^^ {
+      case e1 ~ e2 ~ e3 =>
+        Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
+    }
+
+  protected lazy val primitiveFieldType: Parser[Field] =
+    (primitiveTypes) ^^ {
+      case e1 =>
+        Field("unknown", Some(e1), Some("unknown"), Some(null))
+    }
+
+  protected lazy val arrayFieldType: Parser[Field] =
+    ((ARRAY ^^^ "array") ~> "<" ~> nestedType <~ ">") ^^ {
+      case e1 =>
+        Field("unknown", Some("array"), Some("unknown"),
+          Some(List(Field("val", e1.dataType, Some("val"),
+            e1.children))))
+    }
+
+  protected lazy val structFieldType: Parser[Field] =
+    ((STRUCT ^^^ "struct") ~> "<" ~> repsep(anyFieldDef, ",") <~ ">") ^^ {
+      case e1 =>
+        Field("unknown", Some("struct"), Some("unknown"), Some(e1))
+    }
+
+  protected lazy val measureCol: Parser[Field] =
+    (ident | stringLit) ~ (INTEGER ^^^ "integer" | NUMERIC ^^^ "numeric" | SHORT ^^^ "smallint" |
+                           BIGINT ^^^ "bigint" | DECIMAL ^^^ "decimal").? ~
+    (AS ~> (ident | stringLit)).? ~ (IN ~> (ident | stringLit)).? ^^ {
+      case e1 ~ e2 ~ e3 ~ e4 => Field(e1, e2, e3, Some(null))
+    }
+
+  private def normalizeType(field: Field): Field = {
+    val dataType = field.dataType.getOrElse("NIL")
+    dataType match {
+      case "string" =>
+        Field(field.column, Some("String"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "smallint" =>
+        Field(field.column, Some("SmallInt"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "integer" | "int" =>
+        Field(field.column, Some("Integer"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "timestamp" =>
+        Field(field.column, Some("Timestamp"), field.name, Some(null),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "array" =>
+        Field(field.column, Some("Array"), field.name,
+          field.children.map(f => f.map(normalizeType(_))),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "struct" =>
+        Field(field.column, Some("Struct"), field.name,
+          field.children.map(f => f.map(normalizeType(_))),
+          field.parent, field.storeType, field.schemaOrdinal,
+          field.precision, field.scale, field.rawSchema)
+      case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
+      )
+      // checking if the nested data type contains the child type as decimal(10,0),
+      // if it is present then extracting the precision and scale. resetting the data type
+      // with Decimal.
+      case _ if (dataType.startsWith("decimal")) =>
+        val (precision, scale) = getScaleAndPrecision(dataType)
+        Field(field.column,
+          Some("Decimal"),
+          field.name,
+          Some(null),
+          field.parent,
+          field.storeType, field.schemaOrdinal, precision,
+          scale,
+          field.rawSchema
+        )
+      case _ =>
+        field
+    }
+  }
+
+  private def addParent(field: Field): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "Array" => Field(field.column, Some("Array"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType, field.schemaOrdinal)
+      case "Struct" => Field(field.column, Some("Struct"), field.name,
+        field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
+        field.storeType, field.schemaOrdinal)
+      case _ => field
+    }
+  }
+
+  private def appendParentForEachChild(field: Field, parentName: String): Field = {
+    field.dataType.getOrElse("NIL") match {
+      case "String" => Field(parentName + "." + field.column, Some("String"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Long" => Field(parentName + "." + field.column, Some("Long"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Double" => Field(parentName + "." + field.column, Some("Double"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Timestamp" => Field(parentName + "." + field.column, Some("Timestamp"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Numeric" => Field(parentName + "." + field.column, Some("Numeric"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Array" => Field(parentName + "." + field.column, Some("Array"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "Struct" => Field(parentName + "." + field.column, Some("Struct"),
+        Some(parentName + "." + field.name.getOrElse(None)),
+        field.children
+          .map(f => f.map(appendParentForEachChild(_, parentName + "." + field.column))),
+        parentName)
+      case "BigInt" => Field(parentName + "." + field.column, Some("BigInt"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName)
+      case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale)
+      case _ => field
+    }
+  }
+
+  protected lazy val segmentId: Parser[String] =
+    numericLit ^^ { u => u } |
+    elem("decimal", p => {
+      p.getClass.getSimpleName.equals("FloatLit") ||
+      p.getClass.getSimpleName.equals("DecimalLit")
+    }) ^^ (_.chars)
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bedc96d0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 531b691..f646f1d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -56,7 +56,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
     children: Option[List[Field]], parent: String = null,
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
-    var precision: Int = 0, var scale: Int = 0)
+    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
 
 case class ColumnProperty(key: String, value: String)
 
@@ -108,12 +108,12 @@ case class CompactionCallableModel(storePath: String,
     compactionType: CompactionType)
 
 object TableNewProcessor {
-  def apply(cm: TableModel, sqlContext: SQLContext): TableInfo = {
-    new TableNewProcessor(cm, sqlContext).process
+  def apply(cm: TableModel): TableInfo = {
+    new TableNewProcessor(cm).process
   }
 }
 
-class TableNewProcessor(cm: TableModel, sqlContext: SQLContext) {
+class TableNewProcessor(cm: TableModel) {
 
   var index = 0
   var rowGroup = 0


Mime
View raw message