carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [2/2] carbondata git commit: Changed default store location to spark-warehouse
Date Thu, 20 Jul 2017 02:49:24 GMT
Changed default store location to spark-warehouse

refactored code to use common code

Fixed style


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34d2870a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34d2870a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34d2870a

Branch: refs/heads/metadata
Commit: 34d2870aebd10adfacd56e5892a9b1149fb6a08e
Parents: 8d3c9bf
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Fri Jul 14 11:12:57 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Jul 20 10:45:22 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 -
 .../core/metadata/AbsoluteTableIdentifier.java  |   5 +-
 .../carbondata/core/util/CarbonProperties.java  |   4 -
 .../apache/carbondata/core/util/CarbonUtil.java |  28 +-
 .../spark/sql/common/util/QueryTest.scala       |   6 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |   6 +-
 .../org/apache/carbondata/api/CarbonStore.scala |   3 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 -
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../org/apache/spark/sql/CarbonContext.scala    |  12 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   3 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   4 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   4 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   3 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   4 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  12 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  13 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  54 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |   2 +-
 .../execution/command/AlterTableCommands.scala  |  16 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |   3 +-
 .../execution/command/carbonTableSchema.scala   |  35 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 383 +++++++++----------
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 132 +------
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  54 ++-
 .../spark/sql/hive/CarbonSessionState.scala     |   7 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   2 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  12 +-
 .../org/apache/spark/util/CleanFiles.scala      |   3 +-
 .../org/apache/spark/util/Compaction.scala      |   3 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   3 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   3 +-
 .../org/apache/spark/util/ShowSegments.scala    |   3 +-
 .../org/apache/spark/util/TableLoader.scala     |   3 +-
 .../apache/spark/util/CarbonCommandSuite.scala  |  27 +-
 .../carbondata/processing/merger/TableMeta.java |   4 +-
 .../util/CarbonDataProcessorUtil.java           |   6 +-
 39 files changed, 410 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 55a292e..f6e5c62 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -71,10 +71,6 @@ public final class CarbonCommonConstants {
   @CarbonProperty
   public static final String SORT_SIZE = "carbon.sort.size";
   /**
-   * default location of the carbon member, hierarchy and fact files
-   */
-  public static final String STORE_LOCATION_DEFAULT_VAL = "../carbon.store";
-  /**
    * CARDINALITY_INCREMENT_DEFAULT_VALUE
    */
   public static final int CARDINALITY_INCREMENT_VALUE_DEFAULT_VAL = 10;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 3c39145..22faaf2 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * identifier which will have store path and carbon table identifier
@@ -68,9 +67,9 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
-  public static AbsoluteTableIdentifier from(String dbName, String tableName) {
+  public static AbsoluteTableIdentifier from(String storePath, String dbName, String tableName) {
     CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
-    return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
+    return new AbsoluteTableIdentifier(storePath, identifier);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index d14b7ab..2f5874b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -81,10 +81,6 @@ public final class CarbonProperties {
     } catch (IllegalAccessException e) {
       LOGGER.error("Illelagal access to declared field" + e.getMessage());
     }
-    if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
-      carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
-          CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    }
 
     validateBlockletSize();
     validateNumCores();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9c164a..59f8cd8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -730,15 +730,6 @@ public final class CarbonUtil {
         .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX);
   }
 
-  public static String getCarbonStorePath() {
-    CarbonProperties prop = CarbonProperties.getInstance();
-    if (null == prop) {
-      return null;
-    }
-    return prop.getProperty(CarbonCommonConstants.STORE_LOCATION,
-        CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-  }
-
   /**
    * This method will check the existence of a file at a given path
    */
@@ -1800,6 +1791,25 @@ public final class CarbonUtil {
   }
 
   /**
+   * Removes schema from properties
+   * @param properties
+   * @return
+   */
+  public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) {
+    Map<String, String> newMap = new HashMap<>();
+    newMap.putAll(properties);
+    String partsNo = newMap.get("carbonSchemaPartsNo");
+    if (partsNo == null) {
+      return newMap;
+    }
+    int no = Integer.parseInt(partsNo);
+    for (int i = 0; i < no; i++) {
+      newMap.remove("carbonSchema" + i);
+    }
+    return newMap;
+  }
+
+  /**
    * This method will read the schema file from a given path
    *
    * @param schemaFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9912ec4..9926c57 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 class QueryTest extends PlanTest {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -85,7 +88,8 @@ class QueryTest extends PlanTest {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
-  val storeLocation = TestQueryExecutor.storeLocation
+  lazy val storeLocation = CarbonProperties.getInstance().
+    getProperty(CarbonCommonConstants.STORE_LOCATION)
   val resourcesPath = TestQueryExecutor.resourcesPath
   val integrationPath = TestQueryExecutor.integrationPath
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 5b603aa..5cef14a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -213,8 +213,10 @@ public final class CarbonLoaderUtil {
     String tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow);
     // form local store location
-    final String localStoreLocation = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (localStoreLocation == null) {
+      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+    }
     // submit local folder clean up in another thread so that main thread execution is not blocked
     ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 45719fc..dc37360 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -157,8 +157,9 @@ object CarbonStore {
   def isSegmentValid(
       dbName: String,
       tableName: String,
+      storePath: String,
       segmentId: String): Boolean = {
-    val identifier = AbsoluteTableIdentifier.from(dbName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
     val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
         SegmentStatusManager(
           identifier).getValidAndInvalidSegments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index b76bca3..149e3b1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -53,7 +53,6 @@ object TestQueryExecutor {
   val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
   CarbonProperties.getInstance()
     .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
-    .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
     .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index b8ba9f7..f8275d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -57,8 +57,8 @@ object CarbonThriftServer {
                   "Using default Value and proceeding")
         Thread.sleep(30000)
     }
-
-    val cc = new CarbonContext(sc, args.head)
+    val storePath = if (args.length > 0) args.head else null
+    val cc = new CarbonContext(sc, storePath)
 
     HiveThriftServer2.startWithContext(cc)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 1aeda95..da4b210 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -44,7 +44,7 @@ class CarbonContext(
 
   def this(sc: SparkContext) = {
     this(sc,
-      new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+      null,
       new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
   }
 
@@ -66,8 +66,14 @@ class CarbonContext(
 
   @transient
   override lazy val catalog = {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    val carbonProperties = CarbonProperties.getInstance()
+    if (storePath != null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+      // In case if it is in carbon.properties for backward compatible
+    } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+      carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+        conf.getConfString("spark.sql.warehouse.dir"))
+    }
     new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2fc93e6..71ef6a6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -69,7 +69,8 @@ private[sql] case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
+      new TableMeta(carbonTable.getCarbonTableIdentifier,
+        paths.head, absIdentifier.getTablePath, carbonTable),
       None
     )(sqlContext)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index e8d3907..7790f59 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -236,7 +236,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
                   CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
                   val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
                   metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
+                    null, carbonTable)
                 }
               }
             })
@@ -281,7 +281,7 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+    val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null,
       CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
 
     val fileType = FileFactory.getFileType(schemaMetadataPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index aba6891..34ac940 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -53,9 +53,9 @@ object CarbonThriftServer {
       System.setProperty("carbon.properties.filepath", sparkConf.get("carbon.properties.filepath"))
     }
 
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION, args.head)
+    val storePath = if (args.length > 0) args.head else null
 
-    val spark = builder.getOrCreateCarbonSession(args.head)
+    val spark = builder.getOrCreateCarbonSession(storePath)
     val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
     try {
       Thread.sleep(Integer.parseInt(warmUpTime))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index d1d3015..de7f3fb 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -48,7 +48,8 @@ object CarbonSparkUtil {
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = {
     val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val table = CarbonTable.buildFromTableInfo(tableInfo)
-    val meta = new TableMeta(identifier.getCarbonTableIdentifier, identifier.getStorePath, table)
+    val meta = new TableMeta(identifier.getCarbonTableIdentifier,
+      identifier.getStorePath, tablePath, table)
     CarbonRelation(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName,
       CarbonSparkUtil.createSparkMeta(table), meta)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 1054c62..805a421 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 33091aa..43f6a21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -67,7 +67,7 @@ case class CarbonDictionaryDecoder(
 
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+      val storePath = CarbonEnv.getInstance(sparkSession).storePath
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -121,7 +121,7 @@ case class CarbonDictionaryDecoder(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
 
-    val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d19eb39..9d10ea0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -40,6 +40,8 @@ class CarbonEnv {
 
   var carbonSessionInfo: CarbonSessionInfo = _
 
+  var storePath: String = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -61,10 +63,14 @@ class CarbonEnv {
       // add session params after adding DefaultCarbonParams
       config.addDefaultCarbonSessionParams()
       carbonMetastore = {
-        val storePath =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+        val properties = CarbonProperties.getInstance()
+        storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+        if (storePath == null) {
+          storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+          properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        }
         LOGGER.info(s"carbon env initial: $storePath")
-        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf, storePath)
+        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index b436891..7390cf3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -65,7 +65,7 @@ object CarbonSession {
 
     def getOrCreateCarbonSession(): SparkSession = {
       getOrCreateCarbonSession(
-        new File(CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL).getCanonicalPath,
+        null,
         new File(CarbonCommonConstants.METASTORE_LOCATION_DEFAULT_VAL).getCanonicalPath)
     }
 
@@ -145,9 +145,14 @@ object CarbonSession {
           }
           sc
         }
-
-        CarbonProperties.getInstance()
-          .addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+        val carbonProperties = CarbonProperties.getInstance()
+        if (storePath != null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+          // In case if it is in carbon.properties for backward compatible
+        } else if (carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION) == null) {
+          carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION,
+            sparkContext.conf.get("spark.sql.warehouse.dir"))
+        }
         session = new CarbonSession(sparkContext)
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 498ea03..bec163b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -25,15 +25,19 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.{CreateTable, TableModel, TableNewProcessor}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DecimalType, StructType}
+import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -146,7 +150,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
 
   private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
-      dataSchema: StructType): String = {
+      dataSchema: StructType) = {
 
     val dbName: String = parameters.getOrElse("dbName",
       CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
@@ -158,7 +162,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        CarbonEnv.getInstance(sparkSession).storePath + s"/$dbName/$tableName"
       }
     } catch {
       case ex: NoSuchTableException =>
@@ -168,7 +172,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
           dbName,
           tableName)
         CreateTable(cm, false).run(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        getPathForTable(sparkSession, dbName, tableName, parameters)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
@@ -195,9 +199,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       if (parameters.contains("tablePath")) {
         parameters.get("tablePath").get
       } else {
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession)
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
+        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        relation.tableMeta.tablePath
       }
     } catch {
       case ex: Exception =>
@@ -234,22 +238,46 @@ object CarbonSource {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
-    if (metaStore.isReadFromHiveMetaStore && !properties.contains("carbonSchemaPartsNo")) {
+    if (!properties.contains("carbonSchemaPartsNo")) {
       val dbName: String = properties.getOrElse("dbName",
         CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
       val tableName: String = properties.getOrElse("tableName", "").toLowerCase
       val model = createTableInfoFromParams(properties, tableDesc.schema, dbName, tableName)
       val tableInfo: TableInfo = TableNewProcessor(model)
-      val (tablePath, carbonSchemaString) =
-      metaStore.createTableFromThrift(tableInfo, dbName, tableName)(sparkSession)
-      val map = CarbonUtil.convertToMultiStringMap(tableInfo)
+      val tablePath = CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + tableName
+      val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+      schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+      tableInfo.getFactTable.getSchemaEvalution.
+        getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+      val map = if (metaStore.isReadFromHiveMetaStore) {
+        val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+        val schemaMetadataPath =
+          CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+        tableInfo.setMetaDataFilepath(schemaMetadataPath)
+        tableInfo.setStorePath(tableIdentifier.getStorePath)
+        CarbonUtil.convertToMultiStringMap(tableInfo)
+      } else {
+        metaStore.saveToDisk(tableInfo, tablePath)
+        new java.util.HashMap[String, String]()
+      }
       properties.foreach(e => map.put(e._1, e._2))
       map.put("tablePath", tablePath)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
       tableDesc.copy(storage = updatedFormat)
     } else {
-      tableDesc
+      val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
+      if (!metaStore.isReadFromHiveMetaStore) {
+        // save to disk
+        metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+        // remove schema string from map as we don't store carbon schema to hive metastore
+        val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
+        val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
+        tableDesc.copy(storage = updatedFormat)
+      } else {
+        tableDesc
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 1cc6668..33bba8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -102,7 +102,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       CarbonAliasDecoderRelation(),
       rdd,
       output,
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).carbonMetastore.storePath,
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
       table.carbonTable.getTableInfo.serialize())
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 0d5a821..17e456d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
 import org.apache.spark.util.AlterTableUtil
 
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -167,11 +168,12 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       locks = AlterTableUtil
         .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
             sparkSession)
-      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta.carbonTable
+      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta
+      carbonTable = tableMeta.carbonTable
       // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath.
+        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
       val tableMetadataFile = carbonTablePath.getPath
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
@@ -196,7 +198,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           carbonTable.getCarbonTableIdentifier,
           tableInfo,
           schemaEvolutionEntry,
-          carbonTable.getStorePath)(sparkSession)
+          tableMeta.tablePath)(sparkSession)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
         .runSqlHive(
@@ -206,6 +208,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
           s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
           s"('tableName'='$newTableName', " +
           s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+        Some(oldDatabaseName)).quotedString)
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 609f39b..2731104 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -41,8 +41,8 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
         CarbonDropTableCommand(true, tableName.database, tableName.table).run(sparkSession)
       }
     }
-    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase, CarbonEnv.getInstance(sparkSession)
-      .carbonMetastore.storePath)
+    CarbonUtil.dropDatabaseDirectory(dbName.toLowerCase,
+      CarbonEnv.getInstance(sparkSession).storePath)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 760cb06..18d2dc7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -62,8 +62,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, _, _) =>
         ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).
-          carbonMetastore.storePath)
+        CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cc18fa3..eeb2022 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -156,7 +156,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
     carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+    carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
     var storeLocation = CarbonProperties.getInstance
         .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
@@ -196,7 +196,9 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
@@ -218,10 +220,11 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
     } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val (tablePath, carbonSchemaString) =
-        catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
       if (createDSTable) {
         try {
           val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
@@ -239,7 +242,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
             CarbonEnv.getInstance(sparkSession).carbonMetastore
-                .dropTable(catalog.storePath, identifier)(sparkSession)
+                .dropTable(tablePath, identifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
                 s"and Table name [$tbName] failed")
@@ -332,9 +335,7 @@ object LoadTable {
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 
     // update the schema modified time
-    metastore.updateAndTouchSchemasUpdatedTime(
-      carbonLoadModel.getDatabaseName,
-      carbonLoadModel.getTableName)
+    metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
 
     // update CarbonDataLoadSchema
     val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
@@ -526,7 +527,7 @@ case class LoadTable(
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
+      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
 
       val table = relation.tableMeta.carbonTable
       carbonLoadModel.setTableName(table.getFactTableName)
@@ -882,7 +883,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
+        dbName.toLowerCase, tableName.toLowerCase)
+    catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
@@ -891,7 +895,7 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
 
       CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .dropTable(catalog.storePath, identifier)(sparkSession)
+          .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
@@ -910,10 +914,11 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val metadataFilePath = CarbonStorePath
-        .getCarbonTablePath(catalog.storePath, carbonTableIdentifier).getMetadataDirectoryPath
+    val tableIdentifier =
+      AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
+    val metadataFilePath =
+      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
     val fileType = FileFactory.getFileType(metadataFilePath)
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       val file = FileFactory.getCarbonFile(metadataFilePath, fileType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 549841b..2407054 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,22 +22,23 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -62,7 +63,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore {
+class CarbonFileMetastore(conf: RuntimeConfig) extends CarbonMetaStore {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -77,7 +78,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     System.nanoTime() + ""
   }
 
-  lazy val metadata = loadMetadata(storePath, nextQueryId)
+  val metadata = MetaData(new ArrayBuffer[TableMeta]())
 
 
   /**
@@ -90,9 +91,22 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   override def createCarbonRelation(parameters: Map[String, String],
       absIdentifier: AbsoluteTableIdentifier,
       sparkSession: SparkSession): CarbonRelation = {
-    lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName,
-      Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession)
-      .asInstanceOf[CarbonRelation]
+    val database = absIdentifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = absIdentifier.getCarbonTableIdentifier.getTableName
+    val tables = getTableFromMetadataCache(database, tableName)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableName,
+          CarbonSparkUtil.createSparkMeta(t.carbonTable), t)
+      case None =>
+        readCarbonSchema(absIdentifier) match {
+          case Some(meta) =>
+            CarbonRelation(database, tableName,
+              CarbonSparkUtil.createSparkMeta(meta.carbonTable), meta)
+          case None =>
+            throw new NoSuchTableException(database, tableName)
+        }
+    }
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
@@ -100,20 +114,21 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
-  def lookupRelation(tableIdentifier: TableIdentifier)
+  override def lookupRelation(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): LogicalPlan = {
-    checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase
-    )
-    val tables = getTableFromMetadata(database, tableIdentifier.table, true)
-    tables match {
-      case Some(t) =>
-        CarbonRelation(database, tableIdentifier.table,
-          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head)
-      case None =>
-        throw new NoSuchTableException(database, tableIdentifier.table)
+      sparkSession.catalog.currentDatabase)
+    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+      case SubqueryAlias(_,
+      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+      _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case LogicalRelation(
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        carbonDatasourceHadoopRelation.carbonRelation
+      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
     }
+    relation
   }
 
   /**
@@ -123,8 +138,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    * @return
    */
-  def getTableFromMetadata(database: String,
-      tableName: String, readStore: Boolean = false): Option[TableMeta] = {
+  def getTableFromMetadataCache(database: String, tableName: String): Option[TableMeta] = {
     metadata.tablesMeta
       .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
                  c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
@@ -136,99 +150,48 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    tableExists(TableIdentifier(table, databaseOp))(sparkSession)
   }
 
-  def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    checkSchemasModifiedTimeAndReloadTables()
-    val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    tables.nonEmpty
-  }
-
-  def loadMetadata(metadataPath: String, queryId: String): MetaData = {
-    val recorder = CarbonTimeStatisticsFactory.createDriverRecorder()
-    val statistic = new QueryStatistic()
-    // creating zookeeper instance once.
-    // if zookeeper is configured as carbon lock type.
-    val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
-    if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
-    }
-    if (metadataPath == null) {
-      return null
-    }
-    // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
-        FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
-          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
-        )
-      LOGGER.info("Default lock type HDFSLOCK is configured")
+  override def tableExists(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    try {
+      lookupRelation(tableIdentifier)(sparkSession)
+    } catch {
+      case e: Exception =>
+        return false
     }
-    val fileType = FileFactory.getFileType(metadataPath)
-    val metaDataBuffer = new ArrayBuffer[TableMeta]
-    fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
-      System.currentTimeMillis())
-    recorder.recordStatisticsForDriver(statistic, queryId)
-    MetaData(metaDataBuffer)
+    true
   }
 
-  private def fillMetaData(basePath: String, fileType: FileType,
-      metaDataBuffer: ArrayBuffer[TableMeta]): Unit = {
-    val databasePath = basePath // + "/schemas"
-    try {
-      if (FileFactory.isFileExist(databasePath, fileType)) {
-        val file = FileFactory.getCarbonFile(databasePath, fileType)
-        val databaseFolders = file.listFiles()
-
-        databaseFolders.foreach(databaseFolder => {
-          if (databaseFolder.isDirectory) {
-            val dbName = databaseFolder.getName
-            val tableFolders = databaseFolder.listFiles()
-
-            tableFolders.foreach(tableFolder => {
-              if (tableFolder.isDirectory) {
-                val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                  tableFolder.getName, UUID.randomUUID().toString)
-                val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
-                  carbonTableIdentifier)
-                val tableMetadataFile = carbonTablePath.getSchemaFilePath
-
-                if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-                  val tableName = tableFolder.getName
-                  val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-                  val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-                  val schemaConverter = new ThriftWrapperSchemaConverterImpl
-                  val wrapperTableInfo = schemaConverter
-                    .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
-                  val schemaFilePath = CarbonStorePath
-                    .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
-                  wrapperTableInfo.setStorePath(storePath)
-                  wrapperTableInfo
-                    .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-                  CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-                  val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-                  metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath,
-                    carbonTable)
-                }
-              }
-            })
-          }
-        })
-      } else {
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        s.printStackTrace()
-        // Create folders and files.
-        FileFactory.mkdirs(databasePath, fileType)
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[TableMeta] = {
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+    val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val storePath = identifier.getStorePath
+    val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(),
+      tableName.toLowerCase(), UUID.randomUUID().toString)
+    val carbonTablePath =
+      CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+    val tableMetadataFile = carbonTablePath.getSchemaFilePath
+    val fileType = FileFactory.getFileType(tableMetadataFile)
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      val tableUniqueName = dbName + "_" + tableName
+      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath)
+      val schemaFilePath = CarbonStorePath
+        .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath
+      wrapperTableInfo.setStorePath(storePath)
+      wrapperTableInfo
+        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      val tableMeta = new TableMeta(carbonTable.getCarbonTableIdentifier,
+        identifier.getStorePath,
+        identifier.getTablePath,
+        carbonTable)
+      metadata.tablesMeta += tableMeta
+      Some(tableMeta)
+    } else {
+      None
     }
   }
 
@@ -238,15 +201,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String) (sparkSession: SparkSession): String = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
@@ -255,11 +218,19 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
       .fromExternalToWrapperTableInfo(thriftTableInfo,
           newTableIdentifier.getDatabaseName,
           newTableIdentifier.getTableName,
-          carbonStorePath)
-    createSchemaThriftFile(wrapperTableInfo,
+        absoluteTableIdentifier.getStorePath)
+    val identifier =
+      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        wrapperTableInfo.getFactTable.getTableId)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
+      identifier)
+    addTableCache(wrapperTableInfo,
+      AbsoluteTableIdentifier.from(absoluteTableIdentifier.getStorePath,
       newTableIdentifier.getDatabaseName,
-      newTableIdentifier.getTableName)(sparkSession)
+      newTableIdentifier.getTableName))
+    path
   }
 
   /**
@@ -267,25 +238,27 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
+   * @param tablePath
    * @param sparkSession
    */
   def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      carbonStorePath: String)
-    (sparkSession: SparkSession): String = {
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val wrapperTableInfo = schemaConverter
       .fromExternalToWrapperTableInfo(thriftTableInfo,
         carbonTableIdentifier.getDatabaseName,
         carbonTableIdentifier.getTableName,
-        carbonStorePath)
+        tableIdentifier.getStorePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    createSchemaThriftFile(wrapperTableInfo,
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
       thriftTableInfo,
-      carbonTableIdentifier.getDatabaseName,
-      carbonTableIdentifier.getTableName)(sparkSession)
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
   }
 
 
@@ -296,24 +269,38 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * Load CarbonTable from wrapper tableInfo
    *
    */
-  def createTableFromThrift(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = {
-    if (tableExists(tableName, Some(dbName))(sparkSession)) {
-      sys.error(s"Table [$tableName] already exists under Database [$dbName]")
-    }
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
+  def saveToDisk(tableInfo: schema.table.TableInfo, tablePath: String) {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val dbName = tableInfo.getDatabaseName
+    val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
-    thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
-      .add(schemaEvolutionEntry)
-    val carbonTablePath = createSchemaThriftFile(tableInfo,
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    tableInfo.setStorePath(identifier.getStorePath)
+    createSchemaThriftFile(tableInfo,
       thriftTableInfo,
-      dbName,
-      tableName)(sparkSession)
+      identifier.getCarbonTableIdentifier)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
-    (carbonTablePath, "")
+  }
+
+  /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+      tablePath: String): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
+    val schemaMetadataPath =
+      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    tableInfo.setMetaDataFilepath(schemaMetadataPath)
+    tableInfo.setStorePath(tableIdentifier.getStorePath)
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    removeTableFromMetadata(tableInfo.getDatabaseName, tableInfo.getFactTable.getTableName)
+    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    addTableCache(tableInfo, tableIdentifier)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
   }
 
   /**
@@ -321,23 +308,16 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    *
    * @param tableInfo
    * @param thriftTableInfo
-   * @param dbName
-   * @param tableName
-   * @param sparkSession
    * @return
    */
-  private def createSchemaThriftFile(
-      tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
-      thriftTableInfo: org.apache.carbondata.format.TableInfo,
-      dbName: String, tableName: String)
-    (sparkSession: SparkSession): String = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
+      thriftTableInfo: TableInfo,
+      carbonTableIdentifier: CarbonTableIdentifier): String = {
+    val carbonTablePath = CarbonStorePath.
+      getCarbonTablePath(tableInfo.getStorePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -346,13 +326,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     thriftWriter.open(FileWriteOperation.OVERWRITE)
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-    removeTableFromMetadata(dbName, tableName)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(tableInfo.getStorePath))
+    carbonTablePath.getPath
+  }
+
+  protected def addTableCache(tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+    val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
+    CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
+    removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
-      CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
+    val tableMeta = new TableMeta(identifier, absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getTablePath,
+      CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName))
     metadata.tablesMeta += tableMeta
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-    carbonTablePath.getPath
   }
 
   /**
@@ -362,13 +349,15 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
    * @param tableName
    */
   def removeTableFromMetadata(dbName: String, tableName: String): Unit = {
-    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+    val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadataCache(dbName, tableName)
     metadataToBeRemoved match {
       case Some(tableMeta) =>
         metadata.tablesMeta -= tableMeta
         CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
       case None =>
-        LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        if (LOGGER.isDebugEnabled) {
+          LOGGER.debug(s"No entry for table $tableName in database $dbName")
+        }
     }
   }
 
@@ -402,23 +391,23 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
 
 
   def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
-    val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = tableIdentifier.table.toLowerCase
-
-    val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getPath
-
-    val fileType = FileFactory.getFileType(tablePath)
-    FileFactory.isFileExist(tablePath, fileType)
+    try {
+      val tablePath = lookupRelation(tableIdentifier)(sparkSession).
+        asInstanceOf[CarbonRelation].tableMeta.tablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      FileFactory.isFileExist(tablePath, fileType)
+    } catch {
+      case e: Exception =>
+        false
+    }
   }
 
-  def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
-
-    val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath,
-      new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val metadataFilePath = CarbonStorePath.getCarbonTablePath(identifier).getMetadataDirectoryPath
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
@@ -429,26 +418,18 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables
-
-      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
-        tableIdentifier.table)
-      metadataToBeRemoved match {
-        case Some(tableMeta) =>
-          metadata.tablesMeta -= tableMeta
-          CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
-          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
-        case None =>
-          LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName")
-      }
+      checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+
+      removeTableFromMetadata(dbName, tableName)
+      updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     }
   }
 
-  private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-    val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
+  private def getTimestampFileAndType(basePath: String) = {
+    val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
@@ -462,37 +443,20 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     tableModifiedTimeStore.put("default", timeStamp)
   }
 
-  def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) {
-    updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName))
+  def updateAndTouchSchemasUpdatedTime(basePath: String) {
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(basePath))
   }
 
-  /**
-   * This method will read the timestamp of empty schema file
-   *
-   * @param databaseName
-   * @param tableName
-   * @return
-   */
-  private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
-    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
-    } else {
-      System.currentTimeMillis()
-    }
-  }
 
   /**
    * This method will check and create an empty schema timestamp file
    *
-   * @param databaseName
-   * @param tableName
    * @return
    */
-  private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+  private def touchSchemaFileSystemTime(basePath: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(basePath)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
+      LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
     val systemTime = System.currentTimeMillis()
@@ -501,8 +465,9 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
     systemTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables() {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType("", "")
+  def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
+    val (timestampFile, timestampFileType) =
+      getTimestampFileAndType(storePath)
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
@@ -513,7 +478,7 @@ class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends Ca
   }
 
   private def refreshCache() {
-    metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta
+    metadata.tablesMeta.clear()
   }
 
   override def isReadFromHiveMetaStore: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34d2870a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a8f92ce..c328130 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -16,21 +16,15 @@
  */
 package org.apache.spark.sql.hive
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
@@ -41,12 +35,10 @@ import org.apache.carbondata.spark.util.CarbonSparkUtil
 /**
  * Metastore to store carbonschema in hive
  */
-class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
-  extends CarbonFileMetastore(conf, storePath) {
+class CarbonHiveMetaStore(conf: RuntimeConfig) extends CarbonFileMetastore(conf) {
 
   override def isReadFromHiveMetaStore: Boolean = true
 
-
   /**
    * Create spark session from paramters.
    *
@@ -61,7 +53,7 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     if (info != null) {
       val table = CarbonTable.buildFromTableInfo(info)
       val meta = new TableMeta(table.getCarbonTableIdentifier,
-        table.getStorePath, table)
+        absIdentifier.getStorePath, absIdentifier.getTablePath, table)
       CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName,
         CarbonSparkUtil.createSparkMeta(table), meta)
     } else {
@@ -69,111 +61,30 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
     }
   }
 
-  override def lookupRelation(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): LogicalPlan = {
-    val database = tableIdentifier.database.getOrElse(
-      sparkSession.catalog.currentDatabase)
-    val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-      case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
-      _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case LogicalRelation(
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        carbonDatasourceHadoopRelation.carbonRelation
-      case _ => throw new NoSuchTableException(database, tableIdentifier.table)
-    }
-    relation
-  }
-
-  /**
-   * This method will search for a table in the catalog metadata
-   *
-   * @param database
-   * @param tableName
-   * @return
-   */
-  override def getTableFromMetadata(database: String,
-      tableName: String,
-      readStore: Boolean): Option[TableMeta] = {
-    if (!readStore) {
-      None
-    } else {
-      super.getTableFromMetadata(database, tableName, readStore)
-    }
-  }
-
-  override def tableExists(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): Boolean = {
-    try {
-      lookupRelation(tableIdentifier)(sparkSession)
-    } catch {
-      case e: Exception =>
-        return false
-    }
-    true
-  }
-
-  override def loadMetadata(metadataPath: String,
-      queryId: String): MetaData = {
-    MetaData(new ArrayBuffer[TableMeta])
-  }
-
-
-  /**
-   *
-   * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
-   * Load CarbonTable from wrapper tableInfo
-   *
-   */
-  override def createTableFromThrift(tableInfo: TableInfo, dbName: String,
-      tableName: String)(sparkSession: SparkSession): (String, String) = {
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      tableInfo.getFactTable.getTableId)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setStorePath(storePath)
-    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
-    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    removeTableFromMetadata(dbName, tableName)
-    CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-    (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ","))
-  }
-
-  /**
-   * This method will remove the table meta from catalog metadata array
-   *
-   * @param dbName
-   * @param tableName
-   */
-  override def removeTableFromMetadata(dbName: String,
-      tableName: String): Unit = {
-    // do nothing
-  }
 
   override def isTablePathExists(tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Boolean = {
     tableExists(tableIdentifier)(sparkSession)
   }
 
-  override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier)
+  override def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): Unit = {
     val dbName = tableIdentifier.database.get
     val tableName = tableIdentifier.table
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     if (null != carbonTable) {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
+    checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+    removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTables(): Unit = {
+  override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {
     // do nothing now
   }
 
@@ -200,23 +111,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    * @param newTableIdentifier
    * @param thriftTableInfo
    * @param schemaEvolutionEntry
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
     updateHiveMetaStore(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
@@ -232,23 +143,20 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
         newTableIdentifier.getDatabaseName,
         newTableIdentifier.getTableName,
         carbonStorePath)
-    wrapperTableInfo.setStorePath(storePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier)
+    wrapperTableInfo.setStorePath(carbonStorePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
     val schemaMetadataPath =
       CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
     wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
-    val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName,
-      wrapperTableInfo.getFactTable.getTableId)
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive(
       s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
-    removeTableFromMetadata(wrapperTableInfo.getDatabaseName,
-      wrapperTableInfo.getFactTable.getTableName)
+    removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath
+    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
   }
 
   /**
@@ -256,21 +164,23 @@ class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String)
    *
    * @param carbonTableIdentifier
    * @param thriftTableInfo
-   * @param carbonStorePath
    * @param sparkSession
    */
   override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String)
+      tablePath: String)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
     updateHiveMetaStore(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
-      carbonStorePath,
+      identifier.getStorePath,
       sparkSession,
       schemaConverter)
   }
+
+
 }


Mime
View raw message