carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support when creating table
Date Mon, 04 Dec 2017 06:36:11 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 25c28242a -> 2fe7758be


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index efb6796..b043698 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -28,8 +28,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -50,16 +49,9 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
 
-  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
-    if (tableIdentifier.size > 1) {
-      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
-    } else {
-      TableIdentifier(tableIdentifier(0), None)
-    }
-  }
-
   def deleteDeltaExecution(
-      identifier: Seq[String],
+      databaseNameOp: Option[String],
+      tableName: String,
       sparkSession: SparkSession,
       dataRdd: RDD[Row],
       timestamp: String,
@@ -67,12 +59,10 @@ object DeleteExecution {
       executorErrors: ExecutionErrors): Boolean = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
-    val tableName = getTableIdentifier(identifier).table
-    val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
-    val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+    val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(absoluteTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
 
     var deleteStatus = true
@@ -85,7 +75,6 @@ object DeleteExecution {
         .map(row => Row(row.get(row.fieldIndex(
           CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
       sparkSession.createDataFrame(rdd, schema).rdd
-      // sqlContext.createDataFrame(rdd, schema).rdd
     } else {
       dataRdd
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 553663e..bdecac1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -46,7 +46,7 @@ object HorizontalCompaction {
    */
   def tryHorizontalCompaction(
       sparkSession: SparkSession,
-      carbonRelation: CarbonRelation,
+      carbonTable: CarbonTable,
       isUpdateOperation: Boolean): Unit = {
 
     if (!CarbonDataMergerUtil.isHorizontalCompactionEnabled) {
@@ -54,7 +54,6 @@ object HorizontalCompaction {
     }
 
     var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
-    val carbonTable = carbonRelation.carbonTable
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
     // To make sure that update and delete timestamps are not same,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index f8d99c0..a17e745 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -72,7 +73,9 @@ case class CreatePreAggregateTableCommand(
     // also get updated
     tableModel.parentTable = Some(parentTable)
     tableModel.dataMapRelation = Some(fieldRelationMap)
-    CarbonCreateTableCommand(tableModel).run(sparkSession)
+    val tablePath =
+      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+    CarbonCreateTableCommand(tableModel, Some(tablePath)).run(sparkSession)
 
     val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
     val tableInfo = table.getTableInfo
@@ -112,8 +115,10 @@ case class CreatePreAggregateTableCommand(
     // load child table if parent table has existing segments
     val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
     val tableName = tableIdentifier.table
-    val metastorePath = CarbonEnv.getMetadataPath(Some(dbName),
-      parentTableIdentifier.table)(sparkSession)
+    val metastorePath = CarbonTablePath.getMetadataPath(
+      CarbonEnv.getTablePath(
+        parentTableIdentifier.database,
+        parentTableIdentifier.table)(sparkSession))
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 593a675..9cf36fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -87,8 +87,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getPath
+      val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
+      val tableMetadataFile = oldTablePath.getPath
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
       val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -98,7 +98,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+        metastore.getThriftTableInfo(oldTablePath)(sparkSession)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
@@ -107,8 +107,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      var newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, newTableIdentifier)
-
+      var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
         .asInstanceOf[HiveExternalCatalog].client
@@ -121,8 +120,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+        val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType)
+          .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        newTableName)
         if (!rename) {
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 9198f57..d967bf2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession, _}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
@@ -30,27 +30,35 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 
 case class CarbonCreateTableCommand(
     cm: TableModel,
+    tableLocation: Option[String] = None,
     createDSTable: Boolean = true)
   extends MetadataCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val storePath = CarbonProperties.getStorePath
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
-    val dbLocation = GetDB.getDatabaseLocation(cm.databaseName, sparkSession, storePath)
-    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + cm.tableName
-    val tbName = cm.tableName
-    val dbName = cm.databaseName
-    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+    val tableName = cm.tableName
+    val dbName = CarbonEnv.getDatabaseName(cm.databaseNameOp)(sparkSession)
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm)
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tableName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+          s"Table [$tableName] already exists under database [$dbName]")
+        throw new TableAlreadyExistsException(dbName, tableName)
+      }
+    }
+
+    val tablePath = tableLocation.getOrElse(
+      CarbonEnv.getTablePath(cm.databaseNameOp, tableName)(sparkSession))
+    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    val tableInfo: TableInfo = TableNewProcessor(cm, tableIdentifier)
 
     // Add validation for sort scope when create table
     val sortScope = tableInfo.getFactTable.getTableProperties.asScala
@@ -65,56 +73,48 @@ case class CarbonCreateTableCommand(
       CarbonException.analysisException("Table should have at least one column.")
     }
 
-    if (sparkSession.sessionState.catalog.listTables(dbName)
-      .exists(_.table.equalsIgnoreCase(tbName))) {
-      if (!cm.ifNotExistsSet) {
-        LOGGER.audit(
-          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
-          s"Table [$tbName] already exists under database [$dbName]")
-        throw new TableAlreadyExistsException(dbName, tbName)
-      }
-    } else {
-      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
-      val operationContext = new OperationContext
-      val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
-        new CreateTablePreExecutionEvent(sparkSession,
-          tableIdentifier.getCarbonTableIdentifier,
-          tablePath)
-      OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
-      // Add Database to catalog and persist
-      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
-      if (createDSTable) {
-        try {
-          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
-          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
-          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+    val operationContext = new OperationContext
+    val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
+      CreateTablePreExecutionEvent(sparkSession, tableIdentifier)
+    OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
+    if (createDSTable) {
+      try {
+        val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+        cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+        cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
 
-          sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-          sparkSession.sql(
-            s"""CREATE TABLE $dbName.$tbName
-               |(${ fields.map(f => f.rawSchema).mkString(",") })
-               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
-            s""""$tablePath", path "$tablePath" $carbonSchemaString) """.stripMargin)
-        } catch {
-          case e: AnalysisException => throw e
-          case e: Exception =>
-            // call the drop table to delete the created table.
-            CarbonEnv.getInstance(sparkSession).carbonMetastore
-              .dropTable(tableIdentifier)(sparkSession)
+        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+        val tablePath = tableIdentifier.getTablePath
+        sparkSession.sql(
+          s"""CREATE TABLE $dbName.$tableName
+             |(${ fields.map(f => f.rawSchema).mkString(",") })
+             |USING org.apache.spark.sql.CarbonSource
+             |OPTIONS (
+             |  tableName "$tableName",
+             |  dbName "$dbName",
+             |  tablePath "$tablePath",
+             |  path "$tablePath"
+             |  $carbonSchemaString)
+             """.stripMargin)
+      } catch {
+        case e: AnalysisException => throw e
+        case e: Exception =>
+          // call the drop table to delete the created table.
+          CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .dropTable(tableIdentifier)(sparkSession)
 
-            val msg = s"Create table'$tbName' in database '$dbName' failed."
-            LOGGER.audit(msg)
-            LOGGER.error(e, msg)
-            CarbonException.analysisException(msg)
-        }
+          val msg = s"Create table'$tableName' in database '$dbName' failed."
+          LOGGER.audit(msg)
+          LOGGER.error(e, msg)
+          CarbonException.analysisException(msg)
       }
-      val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
-        new CreateTablePostExecutionEvent(sparkSession, tableIdentifier.getCarbonTableIdentifier)
-      OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
-      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
     }
+    val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
+      CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
+    OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
+    LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tableName]")
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9ec738c..98f103b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -19,22 +19,17 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
 
 case class CarbonDropTableCommand(
@@ -44,49 +39,28 @@ case class CarbonDropTableCommand(
     dropChildTable: Boolean = false)
   extends AtomicRunnableCommand {
 
+  var carbonTable: CarbonTable = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
-    val carbonEnv = CarbonEnv.getInstance(sparkSession)
-    val catalog = carbonEnv.carbonMetastore
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val absoluteTableIdentifier =
-      AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(absoluteTableIdentifier, lock)
+        lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      val carbonTable: Option[CarbonTable] =
-        catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(carbonTable) => Some(carbonTable)
-          case None => try {
-            Some(catalog.lookupRelation(identifier)(sparkSession)
-              .asInstanceOf[CarbonRelation].metaData.carbonTable)
-          } catch {
-            case ex: NoSuchTableException =>
-              if (!ifExistsSet) {
-                throw ex
-              }
-              None
-          }
-        }
-      if (carbonTable.isDefined) {
-        val relationIdentifiers = carbonTable.get.getTableInfo.getParentRelationIdentifiers
-        if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
-          if (!dropChildTable) {
-            if (!ifExistsSet) {
-              throw new Exception("Child table which is associated with datamap cannot " +
-                                  "be dropped, use DROP DATAMAP command to drop")
-            } else {
-              return Seq.empty
-            }
+      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+      val relationIdentifiers = carbonTable.getTableInfo.getParentRelationIdentifiers
+      if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
+        if (!dropChildTable) {
+          if (!ifExistsSet) {
+            throw new Exception("Child table which is associated with datamap cannot " +
+                                "be dropped, use DROP DATAMAP command to drop")
+          } else {
+            return Seq.empty
           }
         }
       }
@@ -97,8 +71,7 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .dropTable(absoluteTableIdentifier)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
 
       // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
@@ -108,7 +81,12 @@ case class CarbonDropTableCommand(
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+
     } catch {
+      case ex: NoSuchTableException =>
+        if (!ifExistsSet) {
+          throw ex
+        }
       case ex: Exception =>
         LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
         CarbonException.analysisException(
@@ -125,23 +103,16 @@ case class CarbonDropTableCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // delete the table folder
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
     if (carbonTable != null) {
       // clear driver side index and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
-    }
-    val metadataFilePath =
-      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
-    val fileType = FileFactory.getFileType(metadataFilePath)
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      // delete the table folder
+      val tablePath = carbonTable.getTablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      if (FileFactory.isFileExist(tablePath, fileType)) {
+        val file = FileFactory.getCarbonFile(tablePath, fileType)
+        CarbonUtil.deleteFoldersAndFilesSilent(file)
+      }
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 8c2acdb..d08ef26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -22,9 +22,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command.AlterTableRenameCommand
-import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand, DeleteExecution}
+import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -35,14 +34,14 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
-      case CarbonProjectForUpdateCommand(_, tableIdentifier) =>
+      case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName) =>
         rejectIfStreamingTable(
-          DeleteExecution.getTableIdentifier(tableIdentifier),
+          TableIdentifier(tableName, databaseNameOp),
           "Data update")
         Nil
-      case CarbonProjectForDeleteCommand(_, tableIdentifier, _) =>
+      case CarbonProjectForDeleteCommand(_, databaseNameOp, tableName, timestamp) =>
         rejectIfStreamingTable(
-          DeleteExecution.getTableIdentifier(tableIdentifier),
+          TableIdentifier(tableName, databaseNameOp),
           "Date delete")
         Nil
       case CarbonAlterTableAddColumnCommand(model) =>
@@ -73,11 +72,9 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
    * Validate whether Update operation is allowed for specified table in the command
    */
   private def rejectIfStreamingTable(tableIdentifier: TableIdentifier, operation: String): Unit = {
-    val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
-      .isStreamingTable
+    val streaming =
+      CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+        .isStreamingTable
     if (streaming) {
       throw new MalformedCarbonCommandException(
         s"$operation is not allowed for streaming table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 846b64c..0b3a2b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -127,8 +127,6 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
     } else {
       updatedSelectPlan
     }
-    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
-    val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
     val destinationTable =
       CarbonReflectionUtils.getUnresolvedRelation(
         table.tableIdentifier,
@@ -142,8 +140,6 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
   def processDeleteRecordsQuery(selectStmt: String,
       alias: Option[String],
       table: UnresolvedRelation): LogicalPlan = {
-    val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
-      table.tableIdentifier.table)
     var addedTupleId = false
     val parsePlan = parser.parsePlan(selectStmt)
 
@@ -160,7 +156,8 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
     }
     CarbonProjectForDeleteCommand(
       selectPlan,
-      tidSeq,
+      table.tableIdentifier.database,
+      table.tableIdentifier.table,
       System.currentTimeMillis().toString)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 3134712..5078259 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -215,13 +215,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
       val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
-      val schemaFilePath = CarbonStorePath
-        .getCarbonTablePath(tablePath, carbonTableIdentifier).getSchemaFilePath
-      wrapperTableInfo.setTablePath(tablePath)
-      wrapperTableInfo
-        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      val wrapperTableInfo =
+        schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       metadata.carbonTables += carbonTable
@@ -233,38 +228,31 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   /**
    * This method will overwrite the existing schema and update it with the given details
-   *
-   * @param newTableIdentifier
-   * @param thriftTableInfo
-   * @param schemaEvolutionEntry
-   * @param tablePath
-   * @param sparkSession
    */
-  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(
+      newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, oldTableIdentifier)
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    val oldCarbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val newAbsoluteTableIdentifier = new AbsoluteTableIdentifier(CarbonUtil
-      .getNewTablePath(oldCarbonTablePath, newTableIdentifier), newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        newAbsoluteTableIdentifier.getTablePath)
-    val identifier =
-      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        wrapperTableInfo.getFactTable.getTableId)
-    val path = createSchemaThriftFile(wrapperTableInfo,
+    val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+    val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
-      identifier)
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
+    val newAbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      newTablePath,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      oldTableIdentifier.getTableId)
+    val path = createSchemaThriftFile(newAbsoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
     path
   }
@@ -280,40 +268,33 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName,
-        absoluteTableIdentifier.getTablePath)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName,
+      absoluteTableIdentifier.getTablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
-    val path = createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
   }
 
-  override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
-  AbsoluteTableIdentifier,
+  override def revertTableSchemaForPreAggCreationFailure(
+      absoluteTableIdentifier: AbsoluteTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-        absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-        absoluteTableIdentifier.getTablePath)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+      absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+      absoluteTableIdentifier.getTablePath)
     val childSchemaList = wrapperTableInfo.getDataMapSchemaList
     childSchemaList.remove(childSchemaList.size() - 1)
-    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
-    val path = createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
-
   }
 
   /**
@@ -326,26 +307,19 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val dbName = tableInfo.getDatabaseName
     val tableName = tableInfo.getFactTable.getTableName
-    val thriftTableInfo = schemaConverter
-      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+    val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo(
+      tableInfo, dbName, tableName)
     val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-    tableInfo.setTablePath(identifier.getTablePath)
-    createSchemaThriftFile(tableInfo,
-      thriftTableInfo,
-      identifier.getCarbonTableIdentifier)
+    createSchemaThriftFile(identifier, thriftTableInfo)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
   }
 
   /**
    * Generates schema string from TableInfo
    */
-  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+  override def generateTableSchemaString(
+      tableInfo: schema.table.TableInfo,
       absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
@@ -357,19 +331,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   /**
    * This method will write the schema thrift file in carbon store and load table metadata
-   *
-   * @param tableInfo
-   * @param thriftTableInfo
-   * @return
    */
-  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
-      thriftTableInfo: TableInfo,
-      carbonTableIdentifier: CarbonTableIdentifier): String = {
-    val carbonTablePath = CarbonStorePath.
-      getCarbonTablePath(tableInfo.getTablePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(
+      identifier: AbsoluteTableIdentifier,
+      thriftTableInfo: TableInfo): String = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -382,8 +350,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
     carbonTablePath.getPath
   }
 
-  protected def addTableCache(tableInfo: table.TableInfo,
-      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+  protected def addTableCache(
+      tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier): ArrayBuffer[CarbonTable] = {
     val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
@@ -427,15 +396,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   def updateMetadataByThriftTable(schemaFilePath: String,
       tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = {
-
     tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis())
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
-    wrapperTableInfo
-      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setTablePath(tablePath)
+    val wrapperTableInfo =
+      schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
     updateMetadataByWrapperTable(wrapperTableInfo)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index f98a53a..a41c51e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -156,20 +156,16 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
   private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String,
+      oldTablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val tablePath = CarbonUtil.getNewTablePath(new Path(carbonStorePath), newTableIdentifier)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        carbonTablePath.toString)
-    wrapperTableInfo.setTablePath(carbonStorePath)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val newTablePath =
+      CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
@@ -180,7 +176,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+    CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath
   }
 
   private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
@@ -189,23 +185,19 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       tablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        newTablePath)
-    wrapperTableInfo.setTablePath(newTablePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(newTablePath, newTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val newTablePath =
+      CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    carbonTablePath.getPath
+    newTablePath
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index cf671cb..b358f83 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -42,8 +41,7 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     }
     var databaseLocation = ""
     try {
-      databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonProperties.getStorePath)
+      databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
     } catch {
       case e: NoSuchDatabaseException =>
         // ignore the exception as exception will be handled by hive command.run

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index e3f0d21..2e39f5e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -125,21 +125,18 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
       case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
         var isTransformed = false
         val newPlan = updatePlan transform {
-          case Project(pList, child) if (!isTransformed) =>
+          case Project(pList, child) if !isTransformed =>
             val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
               .splitAt(pList.size - cols.size)
             val diff = cols.diff(dest.map(_.name.toLowerCase))
-            if (diff.size > 0) {
+            if (diff.nonEmpty) {
               sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
             }
             isTransformed = true
             Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
         }
-        val identifier = table.tableIdentifier.database match {
-          case Some(db) => Seq(db, table.tableIdentifier.table)
-          case _ => Seq(table.tableIdentifier.table)
-        }
-        CarbonProjectForUpdateCommand(newPlan, identifier)
+        CarbonProjectForUpdateCommand(
+          newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 46336ac..6f7b89a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -494,9 +494,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           None,
           true)
 
-        val alterTableAddColumnsModel = AlterTableAddColumnsModel(convertDbNameToLowerCase(dbName),
+        val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+          convertDbNameToLowerCase(dbName),
           table,
-          tableProps,
+          tableProps.toMap,
           tableModel.dimCols,
           tableModel.msrCols,
           tableModel.highcardinalitydims.getOrElse(Seq.empty))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 07dc672..26529f9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 
 import org.antlr.v4.runtime.tree.TerminalNode
-import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -145,11 +145,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       partitionColumns: ColTypeListContext,
       columns : ColTypeListContext,
       tablePropertyList : TablePropertyListContext,
+      locationSpecContext: SqlBaseParser.LocationSpecContext,
       tableComment : Option[String],
       ctas: TerminalNode) : LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
-    val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+    val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
     // TODO: implement temporary tables
     if (temp) {
       throw new ParseException(
@@ -178,8 +179,14 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       val duplicateColumns = colNames.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
       }
-      operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
-                          duplicateColumns.mkString("[", ",", "]"), columns)
+      operationNotAllowed(s"Duplicated column names found in table definition of " +
+                          s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns)
+    }
+
+    val tablePath = if (locationSpecContext != null) {
+      Some(visitLocationSpec(locationSpecContext))
+    } else {
+      None
     }
 
     val tableProperties = mutable.Map[String, String]()
@@ -211,9 +218,10 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     validateStreamingProperty(options)
 
     // prepare table model of the collected tokens
-    val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
-      convertDbNameToLowerCase(name.database),
-      name.table.toLowerCase,
+    val tableModel: TableModel = parser.prepareTableModel(
+      ifNotExists,
+      convertDbNameToLowerCase(tableIdentifier.database),
+      tableIdentifier.table.toLowerCase,
       fields,
       partitionFields,
       tableProperties,
@@ -221,7 +229,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       isAlterFlow = false,
       tableComment)
 
-    CarbonCreateTableCommand(tableModel)
+    CarbonCreateTableCommand(tableModel, tablePath)
   }
 
   private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index cba315b..58b3362 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -194,11 +194,10 @@ object AlterTableUtil {
       oldTableIdentifier.table, tableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newCarbonTableIdentifier)
+    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val tableMetadataFile = carbonTablePath.getPath
-    val fileType = FileFactory.getFileType(tableMetadataFile)
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+    val fileType = FileFactory.getFileType(tablePath)
+    if (FileFactory.isFileExist(tablePath, fileType)) {
       val tableInfo = if (metastore.isReadFromHiveMetaStore) {
         // In case of hive metastore we first update the carbonschema inside old table only.
         metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
@@ -213,7 +212,8 @@ object AlterTableUtil {
         FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
-        val absoluteTableIdentifier = new AbsoluteTableIdentifier(newTablePath,
+        val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+          newTablePath,
           newCarbonTableIdentifier)
         metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier,
           tableInfo, absoluteTableIdentifier)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index b6df33e..9f66737 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -251,6 +251,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
           ctx.partitionColumns,
           ctx.columns,
           ctx.tablePropertyList,
+          ctx.locationSpec,
           Option(ctx.STRING()).map(string),
           ctx.AS)
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 4cb9c6d..56c5747 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -45,6 +45,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     val carbonLoadModel = new CarbonLoadModel
     carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
     carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    carbonLoadModel.setTablePath(relation.metaData.carbonTable.getTablePath)
     val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
@@ -145,7 +146,6 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      sampleRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
 
     DictionaryTestCaseUtil.
@@ -158,7 +158,6 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      complexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
 
     DictionaryTestCaseUtil.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 4551120..8467f8d 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -41,7 +41,7 @@ object DictionaryTestCaseUtil {
     val table = relation.carbonTable
     val dimension = table.getDimensionByName(table.getTableName, columnName)
     val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
-    val  absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
+    val  absoluteTableIdentifier = AbsoluteTableIdentifier.from(table.getTablePath, tableIdentifier)
     val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
       dimension.getColumnIdentifier, dimension.getDataType,
       CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index da5469a..cad7807 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -145,7 +145,8 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       .asInstanceOf[CarbonRelation]
   }
 
-  def buildCarbonLoadModel(relation: CarbonRelation,
+  def buildCarbonLoadModel(
+      relation: CarbonRelation,
       filePath: String,
       header: String,
       extColFilePath: String,
@@ -157,6 +158,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTableName(table.getTableName)
+    carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -201,7 +203,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -213,7 +214,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check the old dictionary and whether the new distinct value is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -229,7 +229,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -241,7 +240,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      verticalDelimiteRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index cec7bbc..48519fd 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -219,12 +219,10 @@ class DataLoadFailAllTypeSortTest extends Spark2QueryTest with BeforeAndAfterAll
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
       sql("create table data_tbm(name String, dob long, weight int) " +
-          "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " +
+          "stored by 'carbondata' tblproperties('bucketnumber'='4', " +
           "'bucketcolumns'='name', 'tableName'='data_tbm')")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
       sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""")
-
-
     } catch {
       case x: Throwable => {
         assert(x.getMessage.contains("Data load failed due to bad record"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 778e1b3..4b7ce63 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -108,8 +108,9 @@ public class FieldEncoderFactory {
                   dataField.getColumn().getDataType());
           CarbonTablePath carbonTablePath =
               CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-          AbsoluteTableIdentifier parentAbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier),
+          AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
+              AbsoluteTableIdentifier.from(
+              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
               parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index db3442e..cda31c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -25,17 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -254,12 +244,10 @@ public final class CarbonLoaderUtil {
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
+  public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
       throws IOException, InterruptedException {
     boolean status = false;
-    String metaDataFilepath =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
@@ -277,7 +265,7 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+            SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -857,18 +845,13 @@ public final class CarbonLoaderUtil {
 
   /**
    * This method will get the store location for the given path, segment id and partition id
-   *
-   * @param carbonStorePath
-   * @param segmentId
    */
-  public static void checkAndCreateCarbonDataLocation(String carbonStorePath,
-      String segmentId, CarbonTable carbonTable) {
+  public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
+    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 485e718..7925b35 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -18,34 +18,26 @@
 package org.apache.carbondata.carbon.datastore;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.StoreCreator;
 
 import junit.framework.TestCase;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class BlockIndexStoreTest extends TestCase {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 8c2f7bb..78c99d6 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -43,7 +43,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
@@ -53,6 +52,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -73,15 +73,15 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.util.TableOptionConstant;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.util.TableOptionConstant;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -106,7 +106,8 @@ public class StoreCreator {
       String dbName = "testdb";
       String tableName = "testtable";
       absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(storePath + "/testdb/testtable",
+          AbsoluteTableIdentifier.from(
+              storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
     } catch (IOException ex) {
 
@@ -268,7 +269,6 @@ public class StoreCreator {
             absoluteTableIdentifier.getCarbonTableIdentifier());
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    tableInfo.setMetaDataFilepath(schemaMetadataPath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index f268883..8c4d5ba 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -293,7 +293,7 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
-      CarbonLoaderUtil.recordLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sqlContext.sparkContext,


Mime
View raw message