carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject carbondata git commit: [CARBONDATA-2061] Check for only valid IN_PROGRESS segments
Date Mon, 29 Jan 2018 08:22:59 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 241b2657a -> 12ccf708f


[CARBONDATA-2061] Check for only valid IN_PROGRESS segments

Problem:
During operations like drop, delete segment, compaction, IUD there is a check for the IN_PROGRESS
segments of a table. This check is simply checking the tblstatus file for IN_PROGRESS segments.

Solution:
The check should validate the IN_PROGRESS segments and decide on Valid and Invalid IN_PROGRESS
segments.

This closes #1844


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/12ccf708
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/12ccf708
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/12ccf708

Branch: refs/heads/master
Commit: 12ccf708f7b10fd9c7667aa37dc04938e76f36c6
Parents: 241b265
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Mon Jan 22 14:42:07 2018 +0530
Committer: kunal642 <kunalkapoor642@gmail.com>
Committed: Mon Jan 29 13:50:49 2018 +0530

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 61 ++++++++++++++------
 .../processing/util/CarbonLoaderUtil.java       |  8 ++-
 2 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/12ccf708/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index e1fadcf..6af0304 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -292,7 +292,7 @@ public class SegmentStatusManager {
         // read existing metadata details in load metadata.
         listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
         if (listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
+          updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
           if (invalidLoadIds.isEmpty()) {
             // All or None , if anything fails then dont write
             if (carbonTableStatusLock.lockWithRetries()) {
@@ -376,8 +376,8 @@ public class SegmentStatusManager {
         // read existing metadata details in load metadata.
         listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
         if (listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps,
-              loadStartTime);
+          updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray,
+              invalidLoadTimestamps, loadStartTime);
           if (invalidLoadTimestamps.isEmpty()) {
             if (carbonTableStatusLock.lockWithRetries()) {
               LOG.info("Table status lock has been successfully acquired.");
@@ -471,8 +471,9 @@ public class SegmentStatusManager {
    * @param invalidLoadIds
    * @return invalidLoadIds
    */
-  private static List<String> updateDeletionStatus(List<String> loadIds,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds)
{
+  private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
+      List<String> loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
+      List<String> invalidLoadIds) {
     SegmentStatus segmentStatus = null;
     for (String loadId : loadIds) {
       boolean loadFound = false;
@@ -488,12 +489,14 @@ public class SegmentStatusManager {
             LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus) {
+          } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus
+              && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) {
             // if the segment status is in progress then no need to delete that.
             LOG.error("Cannot delete the segment " + loadId + " which is load in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus) {
+          } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
+              && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) {
             // if the segment status is overwrite in progress, then no need to delete that.
             LOG.error("Cannot delete the segment " + loadId + " which is load overwrite "
+
                     "in progress");
@@ -531,9 +534,9 @@ public class SegmentStatusManager {
    * @param invalidLoadTimestamps
    * @return invalidLoadTimestamps
    */
-  public static List<String> updateDeletionStatus(String loadDate,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
-      Long loadStartTime) {
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
+      List<String> invalidLoadTimestamps, Long loadStartTime) {
     // For each load timestamp loop through data and if the
     // required load timestamp is found then mark
     // the metadata as deleted.
@@ -550,14 +553,19 @@ public class SegmentStatusManager {
         } else if (SegmentStatus.STREAMING == segmentStatus) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment is streaming in progress.");
-        } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus &&
-            SegmentStatus.INSERT_IN_PROGRESS != segmentStatus &&
-            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != segmentStatus) {
+        } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && checkIfValidLoadInProgress(
+            absoluteTableIdentifier, loadMetadata.getLoadName())) {
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment is insert in progress.");
+        } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
+            && checkIfValidLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName()))
{
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment is insert overwrite in progress.");
+        } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
           loadFound = true;
           updateSegmentMetadataDetails(loadMetadata);
-          LOG.info("Info: " +
-              loadStartTimeString + loadMetadata.getLoadStartTime() +
-              " Marked for Delete");
+          LOG.info("Info: " + loadStartTimeString + loadMetadata.getLoadStartTime()
+              + " Marked for Delete");
         }
       }
     }
@@ -700,11 +708,30 @@ public class SegmentStatusManager {
         SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
         if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
                 segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
-          loadInProgress = true;
+          loadInProgress =
+              checkIfValidLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
+                  loaddetail.getLoadName());
         }
       }
     }
     return loadInProgress;
   }
 
+  /**
+   * This method will check for valid IN_PROGRESS segments.
+   * Tries to acquire a lock on the segment and decide on the stale segments
+   * @param absoluteTableIdentifier
+   *
+   */
+  public static Boolean checkIfValidLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String loadId) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+        CarbonTablePath.addSegmentPrefix(loadId) + LockUsage.LOCK);
+    try {
+      return !segmentLock.lockWithRetries(1, 0);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12ccf708/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1991017..fdc2cc3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -199,10 +199,14 @@ public final class CarbonLoaderUtil {
           // 2. If load or insert into operation is in progress and insert overwrite operation
           // is triggered
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
-            if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+            if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+                && segmentStatusManager.checkIfValidLoadInProgress(
+                    absoluteTableIdentifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in progress");
             } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS)
{
+                && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
+                && segmentStatusManager.checkIfValidLoadInProgress(
+                    absoluteTableIdentifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in progress");
             }
           }


Mime
View raw message