carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1822][Spark-Integration] Support DDL to register the CarbonData table from existing carbon table data
Date Thu, 07 Dec 2017 13:22:45 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master f0f4d7d09 -> 30457c415


[CARBONDATA-1822][Spark-Integration] Support DDL to register the CarbonData table from existing carbon table data

Problem:
Support DDL command to create the table from existing table data.
Solution:
Existing spark refresh table DDL command will be overridden in the carbon to support the register
carbon table with hivemetastore.
REFRESH TABLE .;

This closes #1583


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/30457c41
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/30457c41
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/30457c41

Branch: refs/heads/master
Commit: 30457c41511ed22c9980cfe575e3f277cc7b4c33
Parents: f0f4d7d
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Thu Nov 23 20:02:14 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Dec 7 18:52:30 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../ThriftWrapperSchemaConverterImpl.java       |   7 +-
 .../carbondata/hadoop/util/SchemaReader.java    |  37 ++-
 .../carbondata/events/RefreshTableEvents.scala  |  36 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  47 ++--
 .../command/carbonTableSchemaCommon.scala       |  30 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |  47 ++++
 .../org/apache/spark/sql/CarbonSource.scala     |   4 +-
 .../management/RefreshCarbonTableCommand.scala  | 208 ++++++++++++++
 .../CreatePreAggregateTableCommand.scala        |   4 +-
 .../table/CarbonCreateTableCommand.scala        |  33 ++-
 .../sql/execution/strategy/DDLStrategy.scala    |  12 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   2 -
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  11 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   3 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   7 +-
 .../register/TestRegisterCarbonTable.scala      | 281 +++++++++++++++++++
 17 files changed, 694 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 72d8b0c..5fb08a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1380,6 +1380,11 @@ public final class CarbonCommonConstants {
    */
   public static final String TABLE_COMMENT = "comment";
 
+  /**
+   * this will be used to provide comment for table
+   */
+  public static final String COLUMN_COMMENT = "comment";
+
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index c1e68da..945a40c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -194,9 +194,12 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     thriftColumnSchema.setColumnReferenceId(wrapperColumnSchema.getColumnReferenceId());
     thriftColumnSchema.setSchemaOrdinal(wrapperColumnSchema.getSchemaOrdinal());
     if (wrapperColumnSchema.isSortColumn()) {
-      Map<String, String> properties = new HashMap<String, String>();
+      Map<String, String> properties = wrapperColumnSchema.getColumnProperties();
+      if (null == properties) {
+        properties = new HashMap<String, String>();
+        thriftColumnSchema.setColumnProperties(properties);
+      }
       properties.put(CarbonCommonConstants.SORT_COLUMNS, "true");
-      thriftColumnSchema.setColumnProperties(properties);
     }
     thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getAggFunction());
     if (null != wrapperColumnSchema.getTimeSeriesFunction() && !wrapperColumnSchema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index 47afc9c..c0f8816 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -21,16 +21,15 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
-import org.apache.thrift.TBase;
-
 /**
  * TODO: It should be removed after store manager implementation.
  */
@@ -46,18 +45,8 @@ public class SchemaReader {
         FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
       String tableName = identifier.getCarbonTableIdentifier().getTableName();
 
-      ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
-        public TBase create() {
-          return new org.apache.carbondata.format.TableInfo();
-        }
-      };
-      ThriftReader thriftReader =
-          new ThriftReader(carbonTablePath.getSchemaFilePath(), createTBase);
-      thriftReader.open();
       org.apache.carbondata.format.TableInfo tableInfo =
-          (org.apache.carbondata.format.TableInfo) thriftReader.read();
-      thriftReader.close();
-
+          CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath());
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
           tableInfo,
@@ -71,4 +60,24 @@ public class SchemaReader {
       throw new IOException("File does not exist: " + schemaFilePath);
     }
   }
+  /**
+   * the method returns the Wrapper TableInfo
+   *
+   * @param absoluteTableIdentifier
+   * @return
+   */
+  public static TableInfo getTableInfo(AbsoluteTableIdentifier absoluteTableIdentifier)
+      throws IOException {
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath());
+    ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+        new ThriftWrapperSchemaConverterImpl();
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    TableInfo tableInfo = thriftWrapperSchemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName(),
+            carbonTableIdentifier.getTableName(), absoluteTableIdentifier.getTablePath());
+    return tableInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala
new file mode 100644
index 0000000..662f9e7
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/RefreshTableEvents.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql._
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+/**
+ * Class for handling operations before start of a table registration process.
+ * Example usage: For validation purpose
+ */
+case class RefreshTablePreExecutionEvent(sparkSession: SparkSession,
+    identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo
+
+/**
+ * Class for handling operations after finish of registration process.
+ * Example usage: For validation purpose
+ */
+case class RefreshTablePostExecutionEvent(sparkSession: SparkSession,
+    identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 8972b3d..081e5cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1044,48 +1044,48 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     dataType match {
       case "string" =>
         Field(field.column, Some("String"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "smallint" =>
         Field(field.column, Some("SmallInt"), field.name, Some(null),
           field.parent, field.storeType, field.schemaOrdinal,
-          field.precision, field.scale, field.rawSchema)
+          field.precision, field.scale, field.rawSchema, field.columnComment)
       case "integer" | "int" =>
         Field(field.column, Some("Integer"), field.name, Some(null),
           field.parent, field.storeType, field.schemaOrdinal,
-          field.precision, field.scale, field.rawSchema)
+          field.precision, field.scale, field.rawSchema, field.columnComment)
       case "long" => Field(field.column, Some("Long"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "double" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "float" => Field(field.column, Some("Double"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "timestamp" =>
         Field(field.column, Some("Timestamp"), field.name, Some(null),
           field.parent, field.storeType, field.schemaOrdinal,
-          field.precision, field.scale, field.rawSchema)
+          field.precision, field.scale, field.rawSchema, field.columnComment)
       case "numeric" => Field(field.column, Some("Numeric"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "array" =>
         Field(field.column, Some("Array"), field.name,
           field.children.map(f => f.map(normalizeType(_))),
           field.parent, field.storeType, field.schemaOrdinal,
-          field.precision, field.scale, field.rawSchema)
+          field.precision, field.scale, field.rawSchema, field.columnComment)
       case "struct" =>
         Field(field.column, Some("Struct"), field.name,
           field.children.map(f => f.map(normalizeType(_))),
           field.parent, field.storeType, field.schemaOrdinal,
-          field.precision, field.scale, field.rawSchema)
+          field.precision, field.scale, field.rawSchema, field.columnComment)
       case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       case "decimal" => Field(field.column, Some("Decimal"), field.name, Some(null), field.parent,
-        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema
-      )
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, field.rawSchema,
+        field.columnComment)
       // checking if the nested data type contains the child type as decimal(10,0),
       // if it is present then extracting the precision and scale. resetting the data type
       // with Decimal.
@@ -1098,7 +1098,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
           field.parent,
           field.storeType, field.schemaOrdinal, precision,
           scale,
-          field.rawSchema
+          field.rawSchema,
+          field.columnComment
         )
       case _ =>
         field
@@ -1109,10 +1110,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     field.dataType.getOrElse("NIL") match {
       case "Array" => Field(field.column, Some("Array"), field.name,
         field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema)
+        field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema,
+        columnComment = field.columnComment)
       case "Struct" => Field(field.column, Some("Struct"), field.name,
         field.children.map(f => f.map(appendParentForEachChild(_, field.column))), field.parent,
-        field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema)
+        field.storeType, field.schemaOrdinal, rawSchema = field.rawSchema,
+        columnComment = field.columnComment)
       case _ => field
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 37663ea..6a109f4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -67,7 +67,8 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
     children: Option[List[Field]], parent: String = null,
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
-    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") {
+    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "",
+    var columnComment: String = "") {
   override def equals(o: Any) : Boolean = o match {
     case that: Field =>
       that.column.equalsIgnoreCase(this.column)
@@ -320,13 +321,8 @@ class AlterTableColumnSchemaGenerator(
 // TODO: move this to carbon store API
 object TableNewProcessor {
   def apply(
-      cm: TableModel,
-      identifier: AbsoluteTableIdentifier): TableInfo = {
-    new TableNewProcessor(
-      cm,
-      identifier.getDatabaseName,
-      identifier.getTableName,
-      identifier.getTablePath).process
+      cm: TableModel): TableInfo = {
+    new TableNewProcessor(cm).process
   }
 
   def createColumnSchema(
@@ -373,7 +369,7 @@ object TableNewProcessor {
   }
 }
 
-class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, tablePath: String) {
+class TableNewProcessor(cm: TableModel) {
 
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
@@ -445,6 +441,14 @@ class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, table
     }
     // TODO: Need to fill RowGroupID, converted type
     // & Number of Children after DDL finalization
+    if (field.columnComment.nonEmpty) {
+      var columnProperties = columnSchema.getColumnProperties
+      if (columnProperties == null) {
+        columnProperties = new util.HashMap[String, String]()
+        columnSchema.setColumnProperties(columnProperties)
+      }
+      columnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, field.columnComment)
+    }
     columnSchema
   }
 
@@ -622,12 +626,12 @@ class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, table
       partitionInfo.setColumnSchemaList(partitionCols)
       tableSchema.setPartitionInfo(partitionInfo)
     }
-    tableSchema.setTableName(tableName)
+    tableSchema.setTableName(cm.tableName)
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)
-    tableInfo.setTablePath(tablePath)
-    tableInfo.setDatabaseName(dbName)
-    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
+    tableInfo.setDatabaseName(cm.databaseNameOp.getOrElse(null))
+    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseNameOp.getOrElse(null),
+      cm.tableName))
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     tableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 47f5344..7cc3d11 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -21,8 +21,10 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
@@ -51,4 +53,49 @@ object CarbonSparkUtil {
       table)
   }
 
+  /**
+   * return's the formatted column comment if column comment is present else empty("")
+   *
+   * @param carbonColumn
+   * @return
+   */
+  def getColumnComment(carbonColumn: CarbonColumn): String = {
+    {
+      val columnProperties = carbonColumn.getColumnProperties
+      if (columnProperties != null) {
+        val comment: String = columnProperties.get(CarbonCommonConstants.COLUMN_COMMENT)
+        if (comment != null && comment != null) {
+          return " comment \"" + comment + "\""
+        }
+      }
+      ""
+    }
+  }
+
+  /**
+   * the method return's raw schema
+   *
+   * @param carbonRelation
+   * @return
+   */
+  def getRawSchema(carbonRelation: CarbonRelation): String = {
+    val fields = new Array[String](
+      carbonRelation.dimensionsAttr.size + carbonRelation.measureAttr.size)
+    val carbonTable = carbonRelation.carbonTable
+    carbonRelation.dimensionsAttr.foreach(attr => {
+      val carbonDimension = carbonTable.getDimensionByName(carbonRelation.tableName, attr.name)
+      val carbonColumn = carbonTable.getColumnByName(carbonRelation.tableName, attr.name)
+      val columnComment = getColumnComment(carbonColumn)
+      fields(carbonDimension.getSchemaOrdinal) =
+        '`' + attr.name + '`' + ' ' + attr.dataType.catalogString + columnComment
+    })
+    carbonRelation.measureAttr.foreach(msrAtrr => {
+      val carbonMeasure = carbonTable.getMeasureByName(carbonRelation.tableName, msrAtrr.name)
+      val carbonColumn = carbonTable.getColumnByName(carbonRelation.tableName, msrAtrr.name)
+      val columnComment = getColumnComment(carbonColumn)
+      fields(carbonMeasure.getSchemaOrdinal) =
+        '`' + msrAtrr.name + '`' + ' ' + msrAtrr.dataType.catalogString + columnComment
+    })
+    fields.mkString(",")
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 7fb146c..e61b636 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -308,7 +308,9 @@ object CarbonSource {
       metaStore: CarbonMetaStore,
       properties: Map[String, String]): Map[String, String] = {
     val model = createTableInfoFromParams(properties, dataSchema, identifier)
-    val tableInfo: TableInfo = TableNewProcessor(model, identifier)
+    val tableInfo: TableInfo = TableNewProcessor(model)
+    tableInfo.setTablePath(identifier.getTablePath)
+    tableInfo.setDatabaseName(identifier.getDatabaseName)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
new file mode 100644
index 0000000..45ed298
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.util.CarbonException
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
+import org.apache.carbondata.hadoop.util.SchemaReader
+
+/**
+ * Command to register carbon table from existing carbon table data
+ */
+case class RefreshCarbonTableCommand(
+    databaseNameOp: Option[String],
+    tableName: String)
+  extends MetadataCommand {
+  val LOGGER: LogService =
+    LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    // Steps
+    // 1. get table path
+    // 2. perform the below steps
+    // 2.1 check if the table already register with hive then ignore and continue with the next
+    // schema
+    // 2.2 register the table with the hive check if the table being registered has aggregate table
+    // then do the below steps
+    // 2.2.1 validate that all the aggregate tables are copied at the store location.
+    // 2.2.2 Register the aggregate tables
+    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
+    // 2.1 check if the table already register with hive then ignore and continue with the next
+    // schema
+    if (!sparkSession.sessionState.catalog.listTables(databaseName)
+      .exists(_.table.equalsIgnoreCase(tableName))) {
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+      // check the existence of the schema file to know its a carbon table
+      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      // if schema file does not exist then the table will either non carbon table or stale
+      // carbon table
+      if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) {
+        // read TableInfo
+        val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
+        // 2.2 register the table with the hive check if the table being registered has
+        // aggregate table then do the below steps
+        // 2.2.1 validate that all the aggregate tables are copied at the store location.
+        val dataMapSchemaList = tableInfo.getDataMapSchemaList
+        if (null != dataMapSchemaList && dataMapSchemaList.size() != 0) {
+          // validate all the aggregate tables are copied at the storeLocation
+          val allExists = validateAllAggregateTablePresent(databaseName,
+            dataMapSchemaList, sparkSession)
+          if (!allExists) {
+            // fail the register operation
+            val msg = s"Table registration with Database name [$databaseName] and Table name " +
+                      s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
+                      s" not copied under database [$databaseName]"
+            LOGGER.audit(msg)
+            CarbonException.analysisException(msg)
+          }
+          // 2.2.1 Register the aggregate tables to hive
+          registerAggregates(databaseName, dataMapSchemaList)(sparkSession)
+        }
+        registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession)
+      } else {
+        LOGGER.audit(
+          s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
+          s"failed." +
+          s"Table [$tableName] either non carbon table or stale carbon table under database " +
+          s"[$databaseName]")
+      }
+    } else {
+      LOGGER.audit(
+        s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
+        s"failed." +
+        s"Table [$tableName] either already exists or registered under database [$databaseName]")
+    }
+    // update the schema modified time
+    metaStore.updateAndTouchSchemasUpdatedTime()
+    Seq.empty
+  }
+
+  /**
+   * the method prepare the data type for raw column
+   *
+   * @param column
+   * @return
+   */
+  def prepareDataType(column: ColumnSchema): String = {
+    column.getDataType.getName.toLowerCase() match {
+      case "decimal" =>
+        "decimal(" + column.getPrecision + "," + column.getScale + ")"
+      case others =>
+        others
+    }
+  }
+
+  /**
+   * The method register the carbon table with hive
+   *
+   * @param dbName
+   * @param tableName
+   * @param tableInfo
+   * @param sparkSession
+   * @return
+   */
+  def registerTableWithHive(dbName: String,
+      tableName: String,
+      tableInfo: TableInfo)(sparkSession: SparkSession): Any = {
+    val operationContext = new OperationContext
+    try {
+      val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
+        new RefreshTablePreExecutionEvent(sparkSession,
+          tableInfo.getOrCreateAbsoluteTableIdentifier())
+      OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
+      CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false).run(sparkSession)
+      LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " +
+                   s"[$tableName] is successful.")
+    } catch {
+      case e: AnalysisException => throw e
+      case e: Exception =>
+        throw e
+    }
+    val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent =
+      new RefreshTablePostExecutionEvent(sparkSession,
+        tableInfo.getOrCreateAbsoluteTableIdentifier())
+    OperationListenerBus.getInstance.fireEvent(refreshTablePostExecutionEvent, operationContext)
+  }
+
+  /**
+   * The method validate that all the aggregate table are physically present
+   *
+   * @param dataMapSchemaList
+   * @param sparkSession
+   */
+  def validateAllAggregateTablePresent(dbName: String, dataMapSchemaList: util.List[DataMapSchema],
+      sparkSession: SparkSession): Boolean = {
+    var fileExist = false
+    dataMapSchemaList.asScala.foreach(dataMap => {
+      val tableName = dataMap.getChildSchema.getTableName
+      val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath,
+        new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId))
+      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      try {
+        fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))
+      } catch {
+        case e: Exception =>
+          fileExist = false
+      }
+      if (!fileExist) {
+        return fileExist;
+      }
+    })
+    return true
+  }
+
+  /**
+   * The method iterates over all the aggregate tables and register them to hive
+   *
+   * @param dataMapSchemaList
+   * @return
+   */
+  def registerAggregates(dbName: String,
+      dataMapSchemaList: util.List[DataMapSchema])(sparkSession: SparkSession): Any = {
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    dataMapSchemaList.asScala.foreach(dataMap => {
+      val tableName = dataMap.getChildSchema.getTableName
+      if (!sparkSession.sessionState.catalog.listTables(dbName)
+        .exists(_.table.equalsIgnoreCase(tableName))) {
+        val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession)
+        val absoluteTableIdentifier = AbsoluteTableIdentifier
+          .from(tablePath, dbName, tableName)
+        val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
+        registerTableWithHive(dbName, tableName, tableInfo)(sparkSession)
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 1c23d3a..9a84450 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
@@ -89,7 +90,8 @@ case class CreatePreAggregateTableCommand(
     tableModel.dataMapRelation = Some(fieldRelationMap)
     val tablePath =
       CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
-    CarbonCreateTableCommand(tableModel, Some(tablePath)).run(sparkSession)
+    CarbonCreateTableCommand(TableNewProcessor(tableModel),
+      tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
 
     val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
     val tableInfo = table.getTableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 3ab221c..78b9634 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -29,25 +29,34 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class CarbonCreateTableCommand(
-    cm: TableModel,
+    tableInfo: TableInfo,
+    ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None,
     createDSTable: Boolean = true)
   extends MetadataCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val tableName = cm.tableName
-    val dbName = CarbonEnv.getDatabaseName(cm.databaseNameOp)(sparkSession)
+    val tableName = tableInfo.getFactTable.getTableName
+    var databaseOpt : Option[String] = None
+    if(tableInfo.getDatabaseName != null) {
+      databaseOpt = Some(tableInfo.getDatabaseName)
+    }
+    val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+    // set dbName and tableUnique Name in the table info
+    tableInfo.setDatabaseName(dbName)
+    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]")
 
     if (sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
-      if (!cm.ifNotExistsSet) {
+      if (!ifNotExistsSet) {
         LOGGER.audit(
           s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
           s"Table [$tableName] already exists under database [$dbName]")
@@ -56,9 +65,9 @@ case class CarbonCreateTableCommand(
     }
 
     val tablePath = tableLocation.getOrElse(
-      CarbonEnv.getTablePath(cm.databaseNameOp, tableName)(sparkSession))
+      CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession))
+    tableInfo.setTablePath(tablePath)
     val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-    val tableInfo: TableInfo = TableNewProcessor(cm, tableIdentifier)
 
     // Add validation for sort scope when create table
     val sortScope = tableInfo.getFactTable.getTableProperties.asScala
@@ -81,15 +90,13 @@ case class CarbonCreateTableCommand(
     val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
     if (createDSTable) {
       try {
-        val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
-        cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
-        cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
-
-        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
         val tablePath = tableIdentifier.getTablePath
+        val carbonRelation = CarbonSparkUtil.createCarbonRelation(tableInfo, tablePath)
+        val rawSchema = CarbonSparkUtil.getRawSchema(carbonRelation)
+        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
         sparkSession.sql(
           s"""CREATE TABLE $dbName.$tableName
-             |(${ fields.map(f => f.rawSchema).mkString(",") })
+             |(${ rawSchema })
              |USING org.apache.spark.sql.CarbonSource
              |OPTIONS (
              |  tableName "$tableName",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 6c8cf44..9ae1979 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -22,15 +22,16 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand}
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
 import org.apache.spark.sql.execution.command.partition.CarbonShowCarbonPartitionsCommand
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
+import org.apache.spark.sql.execution.datasources.RefreshTable
 import org.apache.spark.util.FileUtils
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -40,7 +41,8 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
  */
 
 class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
-
+  val LOGGER: LogService =
+    LogServiceFactory.getLogService(this.getClass.getName)
   def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
       case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
@@ -193,6 +195,10 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(
           CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
       }
+      case RefreshTable(tableIdentifier) =>
+        RefreshCarbonTableCommand(tableIdentifier.database,
+          tableIdentifier.table).run(sparkSession)
+        ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 4b33c40..066cb1c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -237,6 +237,4 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       sparkSession,
       schemaConverter)
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 696342f..5a8e58b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -42,6 +42,7 @@ trait CarbonMetaStore {
 
   /**
    * Create spark session from paramters.
+   *
    * @param parameters
    * @param absIdentifier
    * @param sparkSession
@@ -52,8 +53,8 @@ trait CarbonMetaStore {
 
 
   def tableExists(
-    table: String,
-    databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean
+      table: String,
+      databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean
 
   def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
 
@@ -101,6 +102,7 @@ trait CarbonMetaStore {
 
   def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier: AbsoluteTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo)(sparkSession: SparkSession): String
+
   /**
    * Prepare Thrift Schema from wrapper TableInfo and write to disk
    */
@@ -108,6 +110,7 @@ trait CarbonMetaStore {
 
   /**
    * Generates schema string to save it in hive metastore
+   *
    * @param tableInfo
    * @return
    */
@@ -134,16 +137,14 @@ trait CarbonMetaStore {
 
   def checkSchemasModifiedTimeAndReloadTables()
 
-  def isReadFromHiveMetaStore : Boolean
+  def isReadFromHiveMetaStore: Boolean
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
   def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
-
 }
-
 /**
  * Factory for Carbon metastore
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 343db49..ffa2d32 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -522,8 +522,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   def getFields(schema: Seq[StructField]): Seq[Field] = {
     schema.map { col =>
       var columnComment: String = ""
+      var plainComment: String = ""
       if (col.getComment().isDefined) {
         columnComment = " comment \"" + col.getComment().get + "\""
+        plainComment = col.getComment().get
       }
       val x =
         if (col.dataType.catalogString == "float") {
@@ -553,6 +555,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         f.dataType = Some("double")
       }
       f.rawSchema = x
+      f.columnComment = plainComment
       f
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 26529f9..55d784f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
@@ -33,7 +33,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -228,8 +228,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       bucketFields,
       isAlterFlow = false,
       tableComment)
-
-    CarbonCreateTableCommand(tableModel, tablePath)
+    CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, tablePath)
   }
 
   private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/30457c41/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
new file mode 100644
index 0000000..e48aaba
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.register
+
+import java.io.{File, IOException}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+/**
+ *
+ */
+class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop database if exists carbon cascade")
+  }
+
+  def restoreData(dblocation: String, tableName: String) = {
+    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+      FileUtils.deleteDirectory(new File(source))
+    } catch {
+      case e : Exception =>
+        throw new IOException("carbon table data restore failed.")
+    } finally {
+
+    }
+  }
+  def backUpData(dblocation: String, tableName: String) = {
+    val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val destination = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+    } catch {
+      case e : Exception =>
+        throw new IOException("carbon table data backup failed.")
+    }
+  }
+
+  test("register tables test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    checkAnswer(sql("select count(*) from carbontable"), Row(1))
+    checkAnswer(sql("select c1 from carbontable"), Seq(Row("a")))
+  }
+
+  test("register table test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    checkAnswer(sql("select count(*) from carbontable"), Row(1))
+    checkAnswer(sql("select c1 from carbontable"), Seq(Row("a")))
+  }
+
+   test("register pre aggregate tables test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'aa','aaa'")
+    sql("insert into carbontable select 'a',10,'aa','aaa'")
+    sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1")
+    backUpData(dblocation, "carbontable")
+    backUpData(dblocation, "carbontable_preagg1")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    restoreData(dblocation, "carbontable_preagg1")
+    sql("refresh table carbontable")
+    checkAnswer(sql("select count(*) from carbontable"), Row(3))
+    checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a")))
+    checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2))
+    checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b")))
+  }
+
+  test("register pre aggregate table test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'aa','aaa'")
+    sql("insert into carbontable select 'a',10,'aa','aaa'")
+    sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1")
+    backUpData(dblocation, "carbontable")
+    backUpData(dblocation, "carbontable_preagg1")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    restoreData(dblocation, "carbontable_preagg1")
+    sql("refresh table carbontable")
+    checkAnswer(sql("select count(*) from carbontable"), Row(3))
+    checkAnswer(sql("select c1 from carbontable"), Seq(Row("a"), Row("b"), Row("a")))
+    checkAnswer(sql("select count(*) from carbontable_preagg1"), Row(2))
+    checkAnswer(sql("select carbontable_c1 from carbontable_preagg1"), Seq(Row("a"), Row("b")))
+  }
+
+  test("register pre aggregate table should fail if the aggregate table not copied") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'aa','aaa'")
+    sql("insert into carbontable select 'a',10,'aa','aaa'")
+    sql("create datamap preagg1 on table carbontable using 'preaggregate' as select c1,sum(c2) from carbontable group by c1")
+    backUpData(dblocation, "carbontable")
+    backUpData(dblocation, "carbontable_preagg1")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    try {
+      sql("refresh table carbontable")
+      assert(false)
+    } catch {
+      case e : AnalysisException =>
+        assert(true)
+    }
+    restoreData(dblocation, "carbontable_preagg1")
+  }
+
+  test("Update operation on carbon table should pass after registration or refresh") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'bb','bbb'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    // update operation
+    sql("""update carbon.carbontable d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
+    sql("""update carbon.carbontable d  set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show()
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from carbon.carbontable"""),
+      Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"))
+    )
+  }
+
+  test("Update operation on carbon table") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql(
+      """
+         CREATE TABLE automerge(id int, name string, city string, age int)
+         STORED BY 'org.apache.carbondata.format'
+      """)
+    val testData = s"$resourcesPath/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table automerge")
+    backUpData(dblocation, "automerge")
+    sql("drop table automerge")
+    restoreData(dblocation, "automerge")
+    sql("refresh table automerge")
+    // update operation
+    sql("""update carbon.automerge d  set (d.id) = (d.id + 1) where d.id > 2""").show()
+    checkAnswer(
+      sql("select count(*) from automerge"),
+      Seq(Row(6))
+    )
+    //    sql("drop table carbontable")
+  }
+
+  test("Delete operation on carbon table") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'bb','bbb'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    // delete operation
+    sql("""delete from carbontable where c3 = 'aa'""").show
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from carbon.carbontable"""),
+      Seq(Row("b",1,"bb","bbb"))
+    )
+    sql("drop table carbontable")
+  }
+
+  test("Alter table add column test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'bb','bbb'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    sql("Alter table carbontable add columns(c4 string) " +
+        "TBLPROPERTIES('DICTIONARY_EXCLUDE'='c4', 'DEFAULT.VALUE.c4'='def')")
+    checkAnswer(
+      sql("""select c1,c2,c3,c5,c4 from carbon.carbontable"""),
+      Seq(Row("a",1,"aa","aaa","def"), Row("b",1,"bb","bbb","def"))
+    )
+    sql("drop table carbontable")
+  }
+
+  test("Alter table change column datatype test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'bb','bbb'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    sql("Alter table carbontable change c2 c2 long")
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from carbon.carbontable"""),
+      Seq(Row("a",1,"aa","aaa"), Row("b",1,"bb","bbb"))
+    )
+    sql("drop table carbontable")
+  }
+
+  test("Alter table drop column test") {
+    sql("drop database if exists carbon cascade")
+    sql(s"create database carbon location '$dblocation'")
+    sql("use carbon")
+    sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql("insert into carbontable select 'a',1,'aa','aaa'")
+    sql("insert into carbontable select 'b',1,'bb','bbb'")
+    backUpData(dblocation, "carbontable")
+    sql("drop table carbontable")
+    restoreData(dblocation, "carbontable")
+    sql("refresh table carbontable")
+    sql("Alter table carbontable drop columns(c2)")
+    checkAnswer(
+      sql("""select * from carbon.carbontable"""),
+      Seq(Row("a","aa","aaa"), Row("b","bb","bbb"))
+    )
+    sql("drop table carbontable")
+  }
+
+  override def afterAll {
+    sql("use default")
+    sql("drop database if exists carbon cascade")
+  }
+}


Mime
View raw message