carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [01/50] [abbrv] incubator-carbondata git commit: [BUG] Failed to update table status on concurrent compaction and data load. (#741)
Date Thu, 30 Jun 2016 17:41:48 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e9bbf75c2 -> 6e943ff73


[BUG] Failed to update table status on concurrent compaction and data load. (#741)

* If table status lock is failed, retry to acquire, then complete data load or the compaction
should fail.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b0f2f060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b0f2f060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b0f2f060

Branch: refs/heads/master
Commit: b0f2f060f82669a4c62b2b7d6d649e49243846ba
Parents: 841e59d
Author: ravikiran23 <ravikiran.sn042@gmail.com>
Authored: Sat Jun 25 00:49:58 2016 +0530
Committer: Venkata Ramana G <g.ramana.v@gmail.com>
Committed: Sat Jun 25 00:49:58 2016 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/LocalFileLock.java    |   4 +-
 .../org/carbondata/core/locks/LockUsage.java    |   3 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java | 104 +++++++++++------
 .../spark/merger/CarbonDataMergerUtil.java      | 102 ++++++++++-------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  11 +-
 .../datacompaction/DataCompactionLockTest.scala | 111 +++++++++++++++++++
 .../lcm/status/SegmentStatusManager.java        |  13 +++
 7 files changed, 269 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
index 362b86f..2b26106 100644
--- a/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
@@ -84,9 +84,9 @@ public class LocalFileLock extends AbstractCarbonLock {
     this.lockUsage = lockUsage;
     location = location.replace("\\", "/");
     String tempStr = location.substring(0, location.lastIndexOf('/'));
+    cubeName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
+    tempStr = tempStr.substring(0, tempStr.lastIndexOf('/'));
     schemaName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
-
-    cubeName = location.substring(location.lastIndexOf('/') + 1, location.length());
     this.location =
         tmpPath + File.separator + schemaName + File.separator + cubeName + File.separator
             + this.lockUsage;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/core/src/main/java/org/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/carbondata/core/locks/LockUsage.java
index 850b6bf..f07cfb8 100644
--- a/core/src/main/java/org/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/carbondata/core/locks/LockUsage.java
@@ -24,6 +24,7 @@ package org.carbondata.core.locks;
  */
 public enum LockUsage {
   METADATA_LOCK,
-  COMPACTION_LOCK;
+  COMPACTION_LOCK,
+  TABLE_STATUS_LOCK;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index f78c328..427c7e5 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -29,6 +29,7 @@ import org.carbondata.core.cache.CacheProvider;
 import org.carbondata.core.cache.CacheType;
 import org.carbondata.core.cache.dictionary.Dictionary;
 import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
@@ -48,6 +49,7 @@ import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.carbondata.core.load.LoadMetadataDetails;
+import org.carbondata.core.locks.ICarbonLock;
 import org.carbondata.core.util.CarbonProperties;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.util.CarbonUtilException;
@@ -692,52 +694,82 @@ public final class CarbonLoaderUtil {
   /**
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.
+   *
+   * @param loadCount
+   * @param loadMetadataDetails
+   * @param loadModel
+   * @param loadStatus
+   * @param startLoadTime
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
    */
-  public static void recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
+  public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
       CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException
{
 
-    String dataLoadLocation = null;
-    //String dataLoadLocation = getLoadFolderPath(loadModel);
-    dataLoadLocation =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath() + File.separator
-            + CarbonCommonConstants.LOADMETADATA_FILENAME;
-    Gson gsonObjectToRead = new Gson();
-    List<LoadMetadataDetails> listOfLoadFolderDetails = null;
-    DataInputStream dataInputStream = null;
-    String loadEnddate = readCurrentTime();
-    loadMetadataDetails.setTimestamp(loadEnddate);
-    loadMetadataDetails.setLoadStatus(loadStatus);
-    loadMetadataDetails.setLoadName(String.valueOf(loadCount));
-    loadMetadataDetails.setLoadStartTime(startLoadTime);
-    LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-    try {
-      if (FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)))
{
+    boolean status = false;
 
-        dataInputStream = FileFactory
-            .getDataInputStream(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+    String metaDataFilepath =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
 
-        BufferedReader buffReader = new BufferedReader(new InputStreamReader(dataInputStream,
-            CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
-        listOfLoadFolderDetailsArray =
-            gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
-      }
-      listOfLoadFolderDetails =
-          new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(
+        absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
-      if (null != listOfLoadFolderDetailsArray) {
-        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-          listOfLoadFolderDetails.add(loadMetadata);
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+                + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String loadEnddate = readCurrentTime();
+        loadMetadataDetails.setTimestamp(loadEnddate);
+        loadMetadataDetails.setLoadStatus(loadStatus);
+        loadMetadataDetails.setLoadName(String.valueOf(loadCount));
+        loadMetadataDetails.setLoadStartTime(startLoadTime);
+
+        List<LoadMetadataDetails> listOfLoadFolderDetails =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+        if (null != listOfLoadFolderDetailsArray) {
+          for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+            listOfLoadFolderDetails.add(loadMetadata);
+          }
         }
-      }
-      listOfLoadFolderDetails.add(loadMetadataDetails);
+        listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    } finally {
+        segmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
 
-      CarbonUtil.closeStreams(dataInputStream);
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table "
+ loadModel
+            .getDatabaseName() + "." + loadModel.getTableName());
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info(
+            "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+                + "." + loadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." +
loadModel
+                .getTableName() + " during table status updation");
+      }
     }
-    writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getDatabaseName(),
-        loadModel.getTableName(), listOfLoadFolderDetails);
-
+    return status;
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index e26989c..f536293 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -44,6 +44,7 @@ import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.load.LoadMetadataDetails;
+import org.carbondata.core.locks.ICarbonLock;
 import org.carbondata.core.util.CarbonProperties;
 import org.carbondata.integration.spark.merger.CompactionType;
 import org.carbondata.lcm.status.SegmentStatusManager;
@@ -139,60 +140,85 @@ public final class CarbonDataMergerUtil {
     return CarbonCommonConstants.LOAD_FOLDER + segmentNumber;
   }
 
-  public static void updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
+  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails>
loadsToMerge,
       String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
       String mergeLoadStartTime) {
 
+    boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(absoluteTableIdentifier);
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
+    ICarbonLock carbonLock =
+        segmentStatusManager.getTableStatusLock();
 
-    String statusFilePath = carbonTablePath.getTableStatusFilePath();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() +
"."
+            + carbonLoadModel.getTableName() + " for table status updation ");
 
-    LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
 
-    String mergedLoadNumber = MergedLoadName.substring(
-        MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-            + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+        String statusFilePath = carbonTablePath.getTableStatusFilePath();
 
-    String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
-    for (LoadMetadataDetails loadDetail : loadDetails) {
-      // check if this segment is merged.
-      if (loadsToMerge.contains(loadDetail)) {
-        loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
-        loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
-        loadDetail.setMergedLoadName(mergedLoadNumber);
-      }
-    }
+        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
 
-    // create entry for merged one.
-    LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-    loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
-    loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-    String loadEnddate = CarbonLoaderUtil.readCurrentTime();
-    loadMetadataDetails.setTimestamp(loadEnddate);
-    loadMetadataDetails.setLoadName(mergedLoadNumber);
-    loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
-    loadMetadataDetails.setPartitionCount("0");
+        String mergedLoadNumber = MergedLoadName.substring(
+            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
 
-    List<LoadMetadataDetails> updatedDetailsList =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(loadDetails));
+        String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
+        for (LoadMetadataDetails loadDetail : loadDetails) {
+          // check if this segment is merged.
+          if (loadsToMerge.contains(loadDetail)) {
+            loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
+            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+            loadDetail.setMergedLoadName(mergedLoadNumber);
+          }
+        }
 
-    // put the merged folder entry
-    updatedDetailsList.add(loadMetadataDetails);
+        // create entry for merged one.
+        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+        String loadEnddate = CarbonLoaderUtil.readCurrentTime();
+        loadMetadataDetails.setTimestamp(loadEnddate);
+        loadMetadataDetails.setLoadName(mergedLoadNumber);
+        loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
+        loadMetadataDetails.setPartitionCount("0");
 
-    try {
-      segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
-          updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
-    } catch (IOException e) {
-      LOGGER.error("Error while writing metadata");
-    }
+        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
 
+        // put the merged folder entry
+        updatedDetailsList.add(loadMetadataDetails);
+
+        try {
+          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+          tableStatusUpdationStatus = true;
+        } catch (IOException e) {
+          LOGGER.error("Error while writing metadata");
+        }
+      } else {
+        LOGGER.error(
+            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName()
+ "."
+                + carbonLoadModel.getTableName() + "for table status updation");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() +
"."
+                + carbonLoadModel.getTableName() + " during table status updation");
+      }
+    }
+    return tableStatusUpdationStatus;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index fc8d309..2575f0d 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -785,12 +785,19 @@ object CarbonDataRDDFactory extends Logging {
       } else {
         val metadataDetails = status(0)._2
         if (!isAgg) {
-          CarbonLoaderUtil
+          val status = CarbonLoaderUtil
             .recordLoadMetadata(currentLoadCount,
               metadataDetails,
               carbonLoadModel,
               loadStatus,
-              loadStartTime)
+              loadStartTime
+            )
+          if (!status) {
+            val message = "Dataload failed due to failure in table status updation."
+            logger.audit("Data load is failed.")
+            logger.error("Dataload failed due to failure in table status updation.")
+            throw new Exception(message)
+          }
         } else if (!carbonLoadModel.isRetentionRequest) {
           // TODO : Handle it
           logInfo("********Database updated**********")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
new file mode 100644
index 0000000..27fbb1a
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -0,0 +1,111 @@
+package org.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.lcm.status.SegmentStatusManager
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+  * FT for data compaction Locking scenario.
+  */
+class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
+
+  val absoluteTableIdentifier: AbsoluteTableIdentifier = new
+      AbsoluteTableIdentifier(
+        CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+        new CarbonTableIdentifier("default", "compactionLockTestTable", "1")
+      )
+  val carbonTablePath: CarbonTablePath = CarbonStorePath
+    .getCarbonTablePath(absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getCarbonTableIdentifier
+    )
+  val dataPath: String = carbonTablePath.getMetadataDirectoryPath
+
+  val carbonLock: ICarbonLock =
+    CarbonLockFactory.getCarbonLockObj(dataPath, LockUsage.TABLE_STATUS_LOCK)
+
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty("carbon.enable.load.merge", "true")
+    sql("drop table if exists  compactionLockTestTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS compactionLockTestTable (country String, ID Int, date "
+
+        "Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata"
+
+        ".format'"
+    )
+
+    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+      .getCanonicalPath
+    var csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+    var csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+    var csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+    sql("LOAD DATA fact from '" + csvFilePath1 + "' INTO CUBE compactionLockTestTable " +
+      "PARTITIONDATA" +
+      "(DELIMITER ',', QUOTECHAR '\"')"
+    )
+    sql("LOAD DATA fact from '" + csvFilePath2 + "' INTO CUBE compactionLockTestTable  "
+
+      "PARTITIONDATA" +
+      "(DELIMITER ',', QUOTECHAR '\"')"
+    )
+    sql("LOAD DATA fact from '" + csvFilePath3 + "' INTO CUBE compactionLockTestTable  "
+
+      "PARTITIONDATA" +
+      "(DELIMITER ',', QUOTECHAR '\"')"
+    )
+    // take the lock so that next compaction will be failed.
+    carbonLock.lockWithRetries()
+
+    // compaction should happen here.
+    sql("alter table compactionLockTestTable compact 'major'"
+    )
+  }
+
+  /**
+    * Compaction should fail as lock is being held purposefully
+    */
+  test("check if compaction is failed or not.") {
+    var status = true
+    var noOfRetries = 0
+    while (status && noOfRetries < 10) {
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+        absoluteTableIdentifier
+      )
+      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+
+      if (!segments.contains("0.1")) {
+        // wait for 2 seconds for compaction to complete.
+        Thread.sleep(2000)
+        noOfRetries += 1
+      }
+      else {
+        status = false
+      }
+    }
+    assert(status)
+  }
+
+
+  override def afterAll {
+    /* sql("drop cube compactionLockTestTable") */
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    carbonLock.unlock()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b0f2f060/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index f8ac76d..8e4c627 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -70,6 +70,19 @@ public class SegmentStatusManager {
   }
 
   /**
+   * This will return the lock object used to lock the table status file before updation.
+   *
+   * @return
+   */
+  public ICarbonLock getTableStatusLock() {
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    String metaDataFilepath = carbonTablePath.getMetadataDirectoryPath();
+    return CarbonLockFactory.getCarbonLockObj(metaDataFilepath, LockUsage.TABLE_STATUS_LOCK);
+  }
+
+  /**
    * get valid segment for given table
    * @return
    * @throws IOException


Mime
View raw message