carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/4] carbondata git commit: [CARBONDATA-2025] Unify all path construction through CarbonTablePath static method
Date Thu, 01 Feb 2018 07:45:44 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 15b4e192e -> c3e99681b


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 44204d4..f5a90de 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,8 +32,10 @@ import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
@@ -189,7 +191,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("batch_table", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
@@ -197,7 +198,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       thread1.start()
       // use thread pool to catch the exception of sink thread
       val pool = Executors.newSingleThreadExecutor()
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier)
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
@@ -220,11 +221,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_file", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val csvDataDir = new File("target/csvdata").getCanonicalPath
     // streaming ingest 10 rows
     generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
-    val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
       identifier)
     thread.start()
     Thread.sleep(2000)
@@ -646,12 +646,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket
       val thread1 = createWriteSocketThread(server, 2, 10, 3)
-      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false)
       thread1.start()
       thread2.start()
       Thread.sleep(1000)
@@ -749,7 +748,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -770,7 +769,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -808,7 +807,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -821,7 +819,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        carbonTable = carbonTable,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
@@ -863,7 +861,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
   def createFileStreamingThread(
       spark: SparkSession,
-      tablePath: CarbonTablePath,
+      carbonTable: CarbonTable,
       csvDataDir: String,
       intervalSecond: Int,
       tableIdentifier: TableIdentifier): Thread = {
@@ -889,7 +887,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .start()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 9a6efbe..97dc8ba 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -92,7 +92,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       }
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest")
 
-      assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
+      assert(new File(carbonTable.getMetadataPath).listFiles().length < 6)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index a8db6c9..bbc3697 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -56,43 +55,39 @@ public class TableProcessingOperations {
    */
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    String metaDataLocation = carbonTable.getMetadataPath();
     final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String partitionPath = carbonTablePath.getPartitionDir();
-      FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
+    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+    if (FileFactory.isFileExist(partitionPath, fileType)) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+          boolean found = false;
+          for (int j = 0; j < details.length; j++) {
+            if (details[j].getLoadName().equals(segmentId)) {
+              found = true;
+              break;
             }
           }
+          return !found;
+        }
+      });
+      for (int k = 0; k < listFiles.length; k++) {
+        String segmentId =
+            CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+        if (isCompactionFlow) {
+          if (segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
+        } else {
+          if (!segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4cd5014..193d192 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -105,12 +103,11 @@ public class FieldEncoderFactory {
           ColumnIdentifier parentColumnIdentifier =
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
                   dataField.getColumn().getDataType());
-          CarbonTablePath carbonTablePath =
-              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
           AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
               AbsoluteTableIdentifier.from(
-              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
-              parentTableIdentifier);
+                  CarbonTablePath.getNewTablePath(
+                      absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()),
+                  parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
           return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..a08177a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger;
 
 import java.util.List;
 
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor {
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonTable carbonTable,
-      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
       long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
-              carbonTable.getCarbonTableIdentifier()));
+          loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.
       long index = taskNo + 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index d796262..4fa4ff4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -263,7 +263,7 @@ public class CarbonCompactionUtil {
   public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
       List<CarbonTableIdentifier> skipList) {
     for (CarbonTable ctable : carbonTables) {
-      String metadataPath = ctable.getMetaDataFilepath();
+      String metadataPath = ctable.getMetadataPath();
       // check for the compaction required file and at the same time exclude the tables which are
       // present in the skip list.
       if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 0eadc7f..c43dbf9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -31,7 +31,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -42,7 +41,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -167,15 +165,13 @@ public final class CarbonDataMergerUtil {
     // End Timestamp.
 
     // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(identifier);
 
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
     ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -222,7 +218,7 @@ public final class CarbonDataMergerUtil {
           }
 
           LoadMetadataDetails[] loadDetails =
-              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+              SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
           for (LoadMetadataDetails loadDetail : loadDetails) {
             if (loadsToMerge.contains(loadDetail)) {
@@ -235,18 +231,18 @@ public final class CarbonDataMergerUtil {
             }
           }
 
-          segmentUpdateStatusManager
-              .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-          segmentStatusManager
-              .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+          segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+              Arrays.asList(updateLists), timestamp);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
           status = true;
         } else {
           LOGGER.error("Not able to acquire the lock.");
           status = false;
         }
       } catch (IOException e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
-            .getMetadataDirectoryPath());
+        LOGGER.error("Error while updating metadata. The metadata file path is " +
+            CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         status = false;
 
       } finally {
@@ -282,9 +278,9 @@ public final class CarbonDataMergerUtil {
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
       CompactionType compactionType) throws IOException {
     boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -293,10 +289,7 @@ public final class CarbonDataMergerUtil {
         LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
             + carbonLoadModel.getTableName() + " for table status updation ");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+        String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
         LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
@@ -595,10 +588,6 @@ public final class CarbonDataMergerUtil {
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    CarbonTableIdentifier tableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
     // total length
     long totalLength = 0;
 
@@ -613,7 +602,7 @@ public final class CarbonDataMergerUtil {
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(tablePath, tableIdentifier, segId);
+          getSizeOfSegment(tablePath, segId);
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
       if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -652,35 +641,17 @@ public final class CarbonDataMergerUtil {
   /**
    * For calculating the size of the specified segment
    * @param tablePath the store path of the segment
-   * @param tableIdentifier identifier of table that the segment belong to
    * @param segId segment id
    * @return the data size of the segment
    */
-  private static long getSizeOfSegment(String tablePath,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+  private static long getSizeOfSegment(String tablePath, String segId) {
+    String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
   }
 
   /**
-   * This method will get the store location for the given path, segemnt id and partition id
-   *
-   * @param tablePath
-   * @param carbonTableIdentifier identifier of catbon table that the segment belong to
-   * @param segmentId segment id
-   * @return the store location of the segment
-   */
-  private static String getStoreLocation(String tablePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-  }
-
-
-  /**
    * Identify the segments to be merged based on the segment count
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
@@ -1022,21 +993,19 @@ public final class CarbonDataMergerUtil {
    * if UpdateDelta Files are more than IUD Compaction threshold.
    *
    * @param seg
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param segmentUpdateStatusManager
    * @param numberDeltaFilesThreshold
    * @return
    */
   public static Boolean checkUpdateDeltaFilesInSeg(String seg,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
+      AbsoluteTableIdentifier identifier,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg);
+    String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1282,15 +1251,12 @@ public final class CarbonDataMergerUtil {
     CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
 
     // Update the Table Status.
-    String metaDataFilepath = table.getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
+    String metaDataFilepath = table.getMetadataPath();
+    AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
 
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -1304,7 +1270,7 @@ public final class CarbonDataMergerUtil {
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+                SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1313,7 +1279,7 @@ public final class CarbonDataMergerUtil {
           }
         }
         try {
-          segmentStatusManager
+          SegmentStatusManager
                   .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index ff65db2..8fc6e66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -404,8 +404,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
             tempStoreLocation);
-    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 3d0700b..6f506b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -72,8 +72,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
             tempStoreLocation);
-    setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 9f3c86f..bc87823 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -308,8 +307,7 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-            tableName, loadModel.getSegmentId());
+        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getSegmentId());
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
     boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -334,10 +332,9 @@ public class CarbonFactDataHandlerModel {
    * @return data directory path
    */
   private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
-    AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    AbsoluteTableIdentifier identifier = configuration.getTableIdentifier();
     String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId());
+        CarbonTablePath.getSegmentPath(identifier.getTablePath(), configuration.getSegmentId());
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index cfe6e31..ccde9e1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -44,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -143,12 +141,9 @@ public final class CarbonDataProcessorUtil {
     String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
     String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
 
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
-      CarbonTablePath carbonTablePath =
-          CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
-      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId);
 
       localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
     }
@@ -375,12 +370,9 @@ public final class CarbonDataProcessorUtil {
    * @return data directory path
    */
   public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
-      String databaseName, String tableName, String segmentId) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
-    String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String segmentId) {
+    String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
+        factStoreLocation, segmentId);
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 7be61d9..c2f4501 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -54,7 +53,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
@@ -73,11 +71,8 @@ public final class CarbonLoaderUtil {
   }
 
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
     deleteStorePath(segmentPath);
   }
 
@@ -90,33 +85,26 @@ public final class CarbonLoaderUtil {
    */
   public static boolean isValidSegment(CarbonLoadModel loadModel,
       int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
-        .getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     int fileCount = 0;
-    int partitionCount = carbonTable.getPartitionCount();
-    for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
-          currentLoad + "");
-      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
-          FileFactory.getFileType(segmentPath));
-      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(
-              CarbonTablePath.getCarbonIndexExtension())
-              || file.getName().endsWith(
-              CarbonTablePath.getCarbonDataExtension());
-        }
-
-      });
-      fileCount += files.length;
-      if (files.length > 0) {
-        return true;
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+        FileFactory.getFileType(segmentPath));
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override
+      public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(
+            CarbonTablePath.getCarbonIndexExtension())
+            || file.getName().endsWith(
+            CarbonTablePath.getCarbonDataExtension());
       }
+
+    });
+    fileCount += files.length;
+    if (files.length > 0) {
+      return true;
     }
     if (fileCount == 0) {
       return false;
@@ -149,16 +137,15 @@ public final class CarbonLoaderUtil {
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
       throws IOException {
     boolean status = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
     FileType fileType = FileFactory.getFileType(metadataPath);
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType);
     }
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil
         .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -172,7 +159,8 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -198,13 +186,13 @@ public final class CarbonLoaderUtil {
           // is triggered
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                && segmentStatusManager.checkIfValidLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                && SegmentStatusManager.checkIfValidLoadInProgress(
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in progress");
             } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
-                && segmentStatusManager.checkIfValidLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                && SegmentStatusManager.checkIfValidLoadInProgress(
+                identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in progress");
             }
           }
@@ -227,7 +215,7 @@ public final class CarbonLoaderUtil {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
                 // For insert overwrite, we will delete the old segment folder immediately
                 // So collect the old segments here
-                addToStaleFolders(carbonTablePath, staleFolders, entry);
+                addToStaleFolders(identifier, staleFolders, entry);
               }
             }
           }
@@ -236,7 +224,7 @@ public final class CarbonLoaderUtil {
         // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
         // so empty segment folder should be deleted
         if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
-          addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+          addToStaleFolders(identifier, staleFolders, newMetaEntry);
         }
 
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
@@ -270,9 +258,10 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+  private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
-    String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+    String path = CarbonTablePath.getSegmentPath(
+        identifier.getTablePath(), entry.getLoadName());
     // add to the deletion list only if file exist else HDFS file system will throw
     // exception while deleting the file if file path does not exist
     if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -298,11 +287,9 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
+  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
     DataOutputStream dataOutputStream;
     Gson gsonObjectToWrite = new Gson();
@@ -838,10 +825,8 @@ public final class CarbonLoaderUtil {
    * This method will get the store location for the given path, segment id and partition id
    */
   public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
-    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+    String segmentFolder = CarbonTablePath.getSegmentPath(
+        carbonTable.getTablePath(), segmentId);
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
@@ -870,10 +855,8 @@ public final class CarbonLoaderUtil {
    */
   public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
     Map<String, Long> dataIndexSize =
-        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+        CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(), segmentId);
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));
     Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index f9f3e20..1fdce32 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public final class DeleteLoadFolders {
@@ -47,15 +46,14 @@ public final class DeleteLoadFolders {
   /**
    * returns segment path
    *
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param oneLoad
    * @return
    */
-  private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
       LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath(segmentId);
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 7925b35..e059b35 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -73,12 +73,12 @@ public class BlockIndexStoreTest extends TestCase {
 //            file.length(), ColumnarFormatVersion.V1, null);
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
 //    try {
 //
 //      List<TableBlockUniqueIdentifier> tableBlockInfoList =
-//          getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), identifier);
 //      List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList);
 //      assertTrue(loadAndGetBlocks.size() == 1);
 //    } catch (Exception e) {
@@ -86,7 +86,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    }
 //    List<String> segmentIds = new ArrayList<>();
 //      segmentIds.add(info.getSegmentId());
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 //
   private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
@@ -122,19 +122,19 @@ public class BlockIndexStoreTest extends TestCase {
 //
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
 //    ExecutorService executor = Executors.newFixedThreadPool(3);
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.shutdown();
 //    try {
 //      executor.awaitTermination(1, TimeUnit.DAYS);
@@ -145,7 +145,7 @@ public class BlockIndexStoreTest extends TestCase {
 //        Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 });
 //    try {
 //      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
 //      List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers);
 //      assertTrue(loadAndGetBlocks.size() == 5);
 //    } catch (Exception e) {
@@ -155,7 +155,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
 //      segmentIds.add(tableBlockInfo.getSegmentId());
 //    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 //
 //  public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
@@ -193,18 +193,18 @@ public class BlockIndexStoreTest extends TestCase {
 //
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
 //    ExecutorService executor = Executors.newFixedThreadPool(3);
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //
 //    executor.shutdown();
 //    try {
@@ -217,7 +217,7 @@ public class BlockIndexStoreTest extends TestCase {
 //        .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 });
 //    try {
 //      List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
 //      List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList);
 //      assertTrue(loadAndGetBlocks.size() == 8);
 //    } catch (Exception e) {
@@ -227,7 +227,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
 //      segmentIds.add(tableBlockInfo.getSegmentId());
 //    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 
   private class BlockLoaderThread implements Callable<Void> {
@@ -250,7 +250,7 @@ public class BlockIndexStoreTest extends TestCase {
   }
 
   private static File getPartFile() {
-    String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+    String path = StoreCreator.getIdentifier().getTablePath()
         + "/Fact/Part0/Segment_0";
     File file = new File(path);
     File[] files = file.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 7f0aef6..d42dcde 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  */
 public class StoreCreator {
 
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static AbsoluteTableIdentifier identifier;
   private static String storePath = "";
   static {
     try {
       storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
-      absoluteTableIdentifier =
+      identifier =
           AbsoluteTableIdentifier.from(
               storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
     }
   }
 
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+  public static AbsoluteTableIdentifier getIdentifier() {
+    return identifier;
   }
 
   /**
@@ -134,12 +133,12 @@ public class StoreCreator {
       CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
       CarbonLoadModel loadModel = new CarbonLoadModel();
       loadModel.setCarbonDataLoadSchema(schema);
-      loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+      loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+      loadModel.setTablePath(identifier.getTablePath());
       loadModel.setDateFormat(null);
       loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
-    tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+    tableInfo.setTablePath(identifier.getTablePath());
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
@@ -329,7 +325,7 @@ public class StoreCreator {
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+          new DictionaryColumnUniqueIdentifier(identifier,
         		  columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
     loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
         + CarbonCommonConstants.LOADMETADATA_FILENAME;
 
     DataOutputStream dataOutputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
    * get stream segment or create new stream segment if not exists
    */
   public static String open(CarbonTable table) throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
           if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
             newDetails[i] = details[i];
           }
           newDetails[i] = newDetail;
-          SegmentStatusManager
-              .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
           return newDetail.getLoadName();
         } else {
           return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
    */
   public static String close(CarbonTable table, String segmentId)
       throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         for (LoadMetadataDetails detail : details) {
           if (segmentId.equals(detail.getLoadName())) {
             detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
         }
         newDetails[i] = newDetail;
         SegmentStatusManager
-            .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+                table.getTablePath()), newDetails);
         return newDetail.getLoadName();
       } else {
         LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
     try {
       if (statusLock.lockWithRetries()) {
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+            SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
         boolean updated = false;
         for (LoadMetadataDetails detail : details) {
           if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
           }
         }
         if (updated) {
-          CarbonTablePath tablePath =
-              CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
           SegmentStatusManager.writeLoadDetailsIntoFile(
-              tablePath.getTableStatusFilePath(),
+              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
               details);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 197cb14..186d100 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
@@ -216,7 +216,6 @@ object StreamHandoffRDD {
   ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = carbonTable.getAbsoluteTableIdentifier
-    val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
     var continueHandoff = false
     // require handoff lock on table
     val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
@@ -233,7 +232,7 @@ object StreamHandoffRDD {
           try {
             if (statusLock.lockWithRetries()) {
               loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                tablePath.getMetadataDirectoryPath)
+                CarbonTablePath.getMetadataPath(identifier.getTablePath))
             }
           } finally {
             if (null != statusLock) {
@@ -355,19 +354,16 @@ object StreamHandoffRDD {
       loadModel: CarbonLoadModel
   ): Boolean = {
     var status = false
-    val metaDataFilepath =
-      loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
-    val identifier =
-      loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+    val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+    val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
     val fileType = FileFactory.getFileType(metadataPath)
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType)
     }
-    val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
     val segmentStatusManager = new SegmentStatusManager(identifier)
-    val carbonLock = segmentStatusManager.getTableStatusLock()
+    val carbonLock = segmentStatusManager.getTableStatusLock
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
@@ -400,7 +396,7 @@ object StreamHandoffRDD {
         status = true
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
-          .getDatabaseName() + "." + loadModel.getTableName());
+          .getDatabaseName() + "." + loadModel.getTableName())
       }
     } finally {
       if (carbonLock.unlock()) {
@@ -411,6 +407,6 @@ object StreamHandoffRDD {
                      "." + loadModel.getTableName() + " during table status updation")
       }
     }
-    return status
+    status
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index f2274be..c417fbe 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -126,16 +126,14 @@ object StreamSinkFactory {
    * @return
    */
   private def getStreamSegmentId(carbonTable: CarbonTable): String = {
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
-    if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) {
+    val segmentId = StreamSegment.open(carbonTable)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (!FileFactory.isFileExist(segmentDir, fileType)) {
       // Create table directory path, in case of enabling hive metastore first load may not have
       // table folder created.
-      FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+      FileFactory.mkdirs(segmentDir, fileType)
     }
-    val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = carbonTablePath.getSegmentDir(segmentId)
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault
       StreamSegment.recoverSegmentIfRequired(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 45bc19a..ff483e5 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -62,9 +62,7 @@ class CarbonAppendableStreamSink(
     carbonLoadModel: CarbonLoadModel,
     server: Option[DictionaryServer]) extends Sink {
 
-  private val carbonTablePath = CarbonStorePath
-    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-  private val fileLogPath = carbonTablePath.getStreamingLogDir
+  private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
   private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
   // prepare configuration
   private val hadoopConf = {
@@ -149,12 +147,12 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+      val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
 
       // TODO trigger hand off operation
@@ -250,15 +248,13 @@ object CarbonAppendableStreamSink {
         }
 
         // update data file info in index file
-        val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+        StreamSegment.updateIndexFile(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
 
       } catch {
         // catch fault of executor side
         case t: Throwable =>
-          val tablePath =
-            CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-          val segmentDir = tablePath.getSegmentDir(segmentId)
+          val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
           LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
           committer.abortJob(job)


Mime
View raw message