carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [19/50] [abbrv] carbondata git commit: [CARBONDATA-2025] Unify all path construction through CarbonTablePath static method
Date Sun, 04 Mar 2018 12:24:39 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index cd1e28a..d30891a 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -63,30 +63,6 @@ public class BlockIndexStoreTest extends TestCase {
 
   }
 
-//  public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment()
-//      throws IOException {
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3",
"1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-//    try {
-//
-//      List<TableBlockUniqueIdentifier> tableBlockInfoList =
-//          getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }),
absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList);
-//      assertTrue(loadAndGetBlocks.size() == 1);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//      segmentIds.add(info.getSegment());
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
-//
   private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo>
tableBlockInfos,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>();
@@ -95,138 +71,6 @@ public class BlockIndexStoreTest extends TestCase {
     }
     return tableBlockUniqueIdentifiers;
   }
-//
-//  public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently()
-//      throws IOException {
-//    String canonicalPath =
-//        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info1 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//
-//    TableBlockInfo info2 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info3 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info4 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3",
"1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-//    ExecutorService executor = Executors.newFixedThreadPool(3);
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1
}),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4
}),
-//            absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1
}),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4
}),
-//            absoluteTableIdentifier));
-//    executor.shutdown();
-//    try {
-//      executor.awaitTermination(1, TimeUnit.DAYS);
-//    } catch (InterruptedException e) {
-//      e.printStackTrace();
-//    }
-//    List<TableBlockInfo> tableBlockInfos =
-//        Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 });
-//    try {
-//      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers);
-//      assertTrue(loadAndGetBlocks.size() == 5);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegment());
-//    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
-//
-//  public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
-//      throws IOException {
-//    String canonicalPath =
-//        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info1 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info2 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info3 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info4 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info5 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-//            file.length(),ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info6 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info7 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3",
"1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-//    ExecutorService executor = Executors.newFixedThreadPool(3);
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1
}),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4
}),
-//            absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6
}),
-//        absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
-//        absoluteTableIdentifier));
-//
-//    executor.shutdown();
-//    try {
-//      executor.awaitTermination(1, TimeUnit.DAYS);
-//    } catch (InterruptedException e) {
-//      // TODO Auto-generated catch block
-//      e.printStackTrace();
-//    }
-//    List<TableBlockInfo> tableBlockInfos = Arrays
-//        .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6,
info7 });
-//    try {
-//      List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList);
-//      assertTrue(loadAndGetBlocks.size() == 8);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegment());
-//    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
 
   private class BlockLoaderThread implements Callable<Void> {
     private List<TableBlockInfo> tableBlockInfoList;
@@ -248,7 +92,7 @@ public class BlockIndexStoreTest extends TestCase {
   }
 
   private static File getPartFile() {
-    String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+    String path = StoreCreator.getIdentifier().getTablePath()
         + "/Fact/Part0/Segment_0";
     File file = new File(path);
     File[] files = file.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 12b00db..f594585 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  */
 public class StoreCreator {
 
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static AbsoluteTableIdentifier identifier;
   private static String storePath = "";
   static {
     try {
       storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
-      absoluteTableIdentifier =
+      identifier =
           AbsoluteTableIdentifier.from(
               storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
     }
   }
 
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+  public static AbsoluteTableIdentifier getIdentifier() {
+    return identifier;
   }
 
   /**
@@ -134,12 +133,12 @@ public class StoreCreator {
       CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
       CarbonLoadModel loadModel = new CarbonLoadModel();
       loadModel.setCarbonDataLoadSchema(schema);
-      loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+      loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+      loadModel.setTablePath(identifier.getTablePath());
       loadModel.setDateFormat(null);
       loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
-    tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+    tableInfo.setTablePath(identifier.getTablePath());
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
@@ -329,7 +325,7 @@ public class StoreCreator {
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+          new DictionaryColumnUniqueIdentifier(identifier,
         		  columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
     loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
         + CarbonTablePath.TABLE_STATUS_FILE;
 
     DataOutputStream dataOutputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
    * get stream segment or create new stream segment if not exists
    */
   public static String open(CarbonTable table) throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
           if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
             newDetails[i] = details[i];
           }
           newDetails[i] = newDetail;
-          SegmentStatusManager
-              .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
           return newDetail.getLoadName();
         } else {
           return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
    */
   public static String close(CarbonTable table, String segmentId)
       throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         for (LoadMetadataDetails detail : details) {
           if (segmentId.equals(detail.getLoadName())) {
             detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
         }
         newDetails[i] = newDetail;
         SegmentStatusManager
-            .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+                table.getTablePath()), newDetails);
         return newDetail.getLoadName();
       } else {
         LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
     try {
       if (statusLock.lockWithRetries()) {
         LoadMetadataDetails[] details =
-            SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+            SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
         boolean updated = false;
         for (LoadMetadataDetails detail : details) {
           if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
           }
         }
         if (updated) {
-          CarbonTablePath tablePath =
-              CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
           SegmentStatusManager.writeLoadDetailsIntoFile(
-              tablePath.getTableStatusFilePath(),
+              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
               details);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 578f0cd..aa5ead2 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -223,7 +223,6 @@ object StreamHandoffRDD {
   ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = carbonTable.getAbsoluteTableIdentifier
-    val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
     var continueHandoff = false
     // require handoff lock on table
     val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
@@ -240,7 +239,7 @@ object StreamHandoffRDD {
           try {
             if (statusLock.lockWithRetries()) {
               loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                tablePath.getMetadataDirectoryPath)
+                CarbonTablePath.getMetadataPath(identifier.getTablePath))
             }
           } finally {
             if (null != statusLock) {
@@ -378,19 +377,16 @@ object StreamHandoffRDD {
       loadModel: CarbonLoadModel
   ): Boolean = {
     var status = false
-    val metaDataFilepath =
-      loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
-    val identifier =
-      loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+    val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+    val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
     val fileType = FileFactory.getFileType(metadataPath)
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType)
     }
-    val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
     val segmentStatusManager = new SegmentStatusManager(identifier)
-    val carbonLock = segmentStatusManager.getTableStatusLock()
+    val carbonLock = segmentStatusManager.getTableStatusLock
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
@@ -424,7 +420,7 @@ object StreamHandoffRDD {
         status = true
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table "
+ loadModel
-          .getDatabaseName() + "." + loadModel.getTableName());
+          .getDatabaseName() + "." + loadModel.getTableName())
       }
     } finally {
       if (carbonLock.unlock()) {
@@ -435,6 +431,6 @@ object StreamHandoffRDD {
                      "." + loadModel.getTableName() + " during table status updation")
       }
     }
-    return status
+    status
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 75fcfb0..6316d84 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -127,16 +127,14 @@ object StreamSinkFactory {
    * @return
    */
   private def getStreamSegmentId(carbonTable: CarbonTable): String = {
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
-    if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) {
+    val segmentId = StreamSegment.open(carbonTable)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (!FileFactory.isFileExist(segmentDir, fileType)) {
       // Create table directory path, in case of enabling hive metastore first load may not
have
       // table folder created.
-      FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+      FileFactory.mkdirs(segmentDir, fileType)
     }
-    val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = carbonTablePath.getSegmentDir(segmentId)
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault
       StreamSegment.recoverSegmentIfRequired(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 4f1c999..6e6d092 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -64,9 +64,7 @@ class CarbonAppendableStreamSink(
     carbonLoadModel: CarbonLoadModel,
     server: Option[DictionaryServer]) extends Sink {
 
-  private val carbonTablePath = CarbonStorePath
-    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-  private val fileLogPath = carbonTablePath.getStreamingLogDir
+  private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
   private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
   // prepare configuration
   private val hadoopConf = {
@@ -166,12 +164,12 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+      val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
 
       // TODO trigger hand off operation
@@ -270,15 +268,13 @@ object CarbonAppendableStreamSink {
         }
 
         // update data file info in index file
-        val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+        StreamSegment.updateIndexFile(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
 
       } catch {
         // catch fault of executor side
         case t: Throwable =>
-          val tablePath =
-            CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-          val segmentDir = tablePath.getSegmentDir(segmentId)
+          val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
           LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
           committer.abortJob(job)


Mime
View raw message