carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3900][CARBONDATA-3882][CARBONDATA-3881] Fix multiple concurrent issues in table status lock and segment lock for SI and maintable
Date Thu, 23 Jul 2020 09:46:26 GMT
This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c4c05d  [CARBONDATA-3900][CARBONDATA-3882][CARBONDATA-3881] Fix multiple concurrent
issues in table status lock and segment lock for SI and maintable
3c4c05d is described below

commit 3c4c05d4f28321f571f6f05b013bf7d3507dc4bb
Author: ajantha-bhat <ajanthabhat@gmail.com>
AuthorDate: Fri Jun 26 19:59:59 2020 +0530

    [CARBONDATA-3900][CARBONDATA-3882][CARBONDATA-3881] Fix multiple concurrent issues in
table status lock and segment lock for SI and maintable
    
    Why is this PR needed?
    [CARBONDATA-3900] Fix maintable load failure in concurrent load and compaction scenario
    Main table load flow segment lock is released before updating the table status success.
    So, concurrent operation was considering this segment as stale segment (as segment lock
is not present) and cleaning it. Leading to unable to get file status exception.
    [CARBONDATA-3882] Fix wrong lock and missing Table status lock in some SI flows
    In updateLoadMetadataWithMergeStatus, we want update SI table status, but lock is acquired
on main table
    triggerCompaction, updateTableStatusForIndexTables -> table status write is happening
without lock
    [CARBONDATA-3881] Fix concurrent main table compaction and SI load issue
    Consider a scenario, where segmentX has loaded to main table but failed to load to SI
table. So, while loading another segmentY, we reload failed SI segmentX. this time if the
segmentX is compacted in main table and clean files executed on it. SI load will fail and
segment id will not be found in segmentMap of SI and it throws exception.
    
    What changes were proposed in this PR?
    for [CARBONDATA-3900]
    release segment lock after main table is updated.
    for [CARBONDATA-3882]
    In updateLoadMetadataWithMergeStatus, take a lock on SI table
    triggerCompaction, updateTableStatusForIndexTables -> add a lock for table status write
    for [CARBONDATA-3881]
    just before reloading the failed SI segment. check if it is valid segment in main table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3810
---
 .../spark/rdd/CarbonDataRDDFactory.scala           | 361 +++++++++++----------
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  12 +-
 .../load/CarbonInternalLoaderUtil.java             |  28 +-
 .../spark/sql/secondaryindex/load/Compactor.scala  |   2 -
 .../secondaryindex/util/SecondaryIndexUtil.scala   |  82 +++--
 5 files changed, 256 insertions(+), 229 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b049373..daa4ca2 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -482,88 +482,77 @@ object CarbonDataRDDFactory {
         errorMessage = errorMsgLocal
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
-    } finally {
-      segmentLock.unlock()
     }
-    // handle the status file updation for the update cmd.
-    if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-      if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-        CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
-        return null
-      } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
-                 updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS
&&
-                 carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name)
{
-        return null
-      } else {
-        // in success case handle updation of the table status file.
-        // success case.
-        val segmentDetails = new util.HashSet[Segment]()
-        var resultSize = 0
-        res.foreach { resultOfSeg =>
-          resultSize = resultSize + resultOfSeg.size
-          resultOfSeg.foreach { resultOfBlock =>
-            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
-          }
-        }
-        var segmentMetaDataInfoMap = scala.collection.mutable.Map.empty[String, SegmentMetaDataInfo]
-        if (!segmentMetaDataAccumulator.isZero) {
-          segmentMetaDataAccumulator.value.asScala.foreach(map => if (map.nonEmpty) {
-            segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
-          })
-        }
-        val segmentFiles = updateSegmentFiles(carbonTable,
-          segmentDetails,
-          updateModel.get,
-          segmentMetaDataInfoMap.asJava)
-
-        // this means that the update doesnt have any records to update so no need to do
table
-        // status file updation.
-        if (resultSize == 0) {
+    try {
+      // handle the status file updation for the update cmd.
+      if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
+        if (loadStatus == SegmentStatus.LOAD_FAILURE) {
+          CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
           return null
-        }
-        if (!CarbonUpdateUtil.updateTableMetadataStatus(
-          segmentDetails,
-          carbonTable,
-          updateModel.get.updatedTimeStamp + "",
-          true,
-          new util.ArrayList[Segment](0),
-          new util.ArrayList[Segment](segmentFiles), "")) {
-          LOGGER.error("Data update failed due to failure in table status updation.")
-          updateModel.get.executorErrors.errorMsg = errorMessage
-          updateModel.get.executorErrors.failureCauses = FailureCauses
-            .STATUS_FILE_UPDATION_FAILURE
+        } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS
&&
+                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name)
{
           return null
+        } else {
+          // in success case handle updation of the table status file.
+          // success case.
+          val segmentDetails = new util.HashSet[Segment]()
+          var resultSize = 0
+          res.foreach { resultOfSeg =>
+            resultSize = resultSize + resultOfSeg.size
+            resultOfSeg.foreach { resultOfBlock =>
+              segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
+            }
+          }
+          var segmentMetaDataInfoMap = scala
+            .collection
+            .mutable
+            .Map
+            .empty[String, SegmentMetaDataInfo]
+          if (!segmentMetaDataAccumulator.isZero) {
+            segmentMetaDataAccumulator.value.asScala.foreach(map => if (map.nonEmpty)
{
+              segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
+            })
+          }
+          val segmentFiles = updateSegmentFiles(carbonTable,
+            segmentDetails,
+            updateModel.get,
+            segmentMetaDataInfoMap.asJava)
+
+          // this means that the update doesnt have any records to update so no need to do
table
+          // status file updation.
+          if (resultSize == 0) {
+            return null
+          }
+          if (!CarbonUpdateUtil.updateTableMetadataStatus(
+            segmentDetails,
+            carbonTable,
+            updateModel.get.updatedTimeStamp + "",
+            true,
+            new util.ArrayList[Segment](0),
+            new util.ArrayList[Segment](segmentFiles), "")) {
+            LOGGER.error("Data update failed due to failure in table status updation.")
+            updateModel.get.executorErrors.errorMsg = errorMessage
+            updateModel.get.executorErrors.failureCauses = FailureCauses
+              .STATUS_FILE_UPDATION_FAILURE
+            return null
+          }
+          // code to handle Pre-Priming cache for update command
+          if (!segmentFiles.isEmpty) {
+            val segmentsToPrePrime = segmentFiles
+              .asScala
+              .map(iterator => iterator.getSegmentNo)
+              .toSeq
+            DistributedRDDUtils
+              .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime,
+                operationContext, hadoopConf, segmentsToPrePrime.toList)
+          }
         }
-        // code to handle Pre-Priming cache for update command
-        if (!segmentFiles.isEmpty) {
-          val segmentsToPrePrime = segmentFiles.asScala.map(iterator => iterator.getSegmentNo).toSeq
-          DistributedRDDUtils
-            .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime,
-              operationContext, hadoopConf, segmentsToPrePrime.toList)
-        }
-      }
-      return null
-    }
-    val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
-      .asInstanceOf[String]
-    if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-      // update the load entry in table status file for changing the status to marked for
delete
-      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
-      LOGGER.info("********starting clean up**********")
-      if (carbonLoadModel.isCarbonTransactionalTable) {
-        // delete segment is applicable for transactional table
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-        clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+        return null
       }
-      LOGGER.info("********clean up done**********")
-      LOGGER.warn("Cannot write load metadata file as data load failed")
-      throw new Exception(errorMessage)
-    } else {
-      // check if data load fails due to bad record and throw data load failure due to
-      // bad record exception
-      if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
-          status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
-          carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+      val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
+        .asInstanceOf[String]
+      if (loadStatus == SegmentStatus.LOAD_FAILURE) {
         // update the load entry in table status file for changing the status to marked for
delete
         CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
@@ -573,114 +562,138 @@ object CarbonDataRDDFactory {
           clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
-        throw new Exception(status(0)._2._2.errorMsg)
-      }
-      // as no record loaded in new segment, new segment should be deleted
-      val newEntryLoadStatus =
-        if (carbonLoadModel.isCarbonTransactionalTable &&
-            !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
-            !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt))
{
-          LOGGER.warn("Cannot write load metadata file as there is no data to load")
-          SegmentStatus.MARKED_FOR_DELETE
-        } else {
-          loadStatus
+        LOGGER.warn("Cannot write load metadata file as data load failed")
+        throw new Exception(errorMessage)
+      } else {
+        // check if data load fails due to bad record and throw data load failure due to
+        // bad record exception
+        if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
+            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name)
{
+          // update the load entry in table status file for changing the status to marked
for delete
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
+          LOGGER.info("********starting clean up**********")
+          if (carbonLoadModel.isCarbonTransactionalTable) {
+            // delete segment is applicable for transactional table
+            CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+            clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+          }
+          LOGGER.info("********clean up done**********")
+          throw new Exception(status(0)._2._2.errorMsg)
         }
+        // as no record loaded in new segment, new segment should be deleted
+        val newEntryLoadStatus =
+          if (carbonLoadModel.isCarbonTransactionalTable &&
+              !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
+              !CarbonLoaderUtil.isValidSegment(carbonLoadModel,
+                carbonLoadModel.getSegmentId.toInt)) {
+            LOGGER.warn("Cannot write load metadata file as there is no data to load")
+            SegmentStatus.MARKED_FOR_DELETE
+          } else {
+            loadStatus
+          }
 
-      val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
-        carbonLoadModel.getSegmentId,
-        segmentMetaDataAccumulator)
-      val segmentFileName =
-        SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
-          String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
-      // clear segmentMetaDataAccumulator
-      segmentMetaDataAccumulator.reset()
-
-      SegmentFileStore.updateTableStatusFile(
-        carbonTable,
-        carbonLoadModel.getSegmentId,
-        segmentFileName,
-        carbonTable.getCarbonTableIdentifier.getTableId,
-        new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
+        val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+          carbonLoadModel.getSegmentId,
+          segmentMetaDataAccumulator)
+        val segmentFileName =
+          SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
+            String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
+        // clear segmentMetaDataAccumulator
+        segmentMetaDataAccumulator.reset()
 
-      operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
-        carbonLoadModel.getSegmentId)
-      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-        new LoadTablePreStatusUpdateEvent(
-          carbonTable.getCarbonTableIdentifier,
-          carbonLoadModel)
-      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-      val (done, writtenSegment) =
-        updateTableStatus(
-          sqlContext.sparkSession,
-          status,
-          carbonLoadModel,
-          newEntryLoadStatus,
-          overwriteTable,
+        SegmentFileStore.updateTableStatusFile(
+          carbonTable,
+          carbonLoadModel.getSegmentId,
           segmentFileName,
-          updateModel,
-          uniqueTableStatusId)
-      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
-        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
-      val commitComplete = try {
+          carbonTable.getCarbonTableIdentifier.getTableId,
+          new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
+
+        operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
+          carbonLoadModel.getSegmentId)
+        val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+          new LoadTablePreStatusUpdateEvent(
+            carbonTable.getCarbonTableIdentifier,
+            carbonLoadModel)
         OperationListenerBus.getInstance()
-          .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
-        true
-      } catch {
-        case ex: Exception =>
-          LOGGER.error("Problem while committing indexes", ex)
-          false
-      }
-      if (!done || !commitComplete) {
-        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
-        LOGGER.info("********starting clean up**********")
-        if (carbonLoadModel.isCarbonTransactionalTable) {
-          // delete segment is applicable for transactional table
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-          // delete corresponding segment file from metadata
-          val segmentFile = CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath)
+
-                            File.separator + segmentFileName
-          FileFactory.deleteFile(segmentFile)
-          clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+          .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+        val (done, writtenSegment) =
+          updateTableStatus(
+            sqlContext.sparkSession,
+            status,
+            carbonLoadModel,
+            newEntryLoadStatus,
+            overwriteTable,
+            segmentFileName,
+            updateModel,
+            uniqueTableStatusId)
+        val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+          new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+        val commitComplete = try {
+          OperationListenerBus.getInstance()
+            .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+          true
+        } catch {
+          case ex: Exception =>
+            LOGGER.error("Problem while committing indexes", ex)
+            false
+        }
+        if (!done || !commitComplete) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
+          LOGGER.info("********starting clean up**********")
+          if (carbonLoadModel.isCarbonTransactionalTable) {
+            // delete segment is applicable for transactional table
+            CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+            // delete corresponding segment file from metadata
+            val segmentFile =
+              CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+              File.separator + segmentFileName
+            FileFactory.deleteFile(segmentFile)
+            clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+          }
+          LOGGER.info("********clean up done**********")
+          LOGGER.error("Data load failed due to failure in table status updation.")
+          throw new Exception("Data load failed due to failure in table status updation.")
+        }
+        if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
+          LOGGER.info("Data load is partially successful for " +
+                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
+        } else {
+          LOGGER.info("Data load is successful for " +
+                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
         }
-        LOGGER.info("********clean up done**********")
-        LOGGER.error("Data load failed due to failure in table status updation.")
-        throw new Exception("Data load failed due to failure in table status updation.")
-      }
-      if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
-        LOGGER.info("Data load is partially successful for " +
-                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
-      } else {
-        LOGGER.info("Data load is successful for " +
-                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
-      }
 
-      // code to handle Pre-Priming cache for loading
+        // code to handle Pre-Priming cache for loading
 
-      if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-        DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-          operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-      }
-      try {
-        // compaction handling
-        if (carbonTable.isHivePartitionTable) {
-          carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
+            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
         }
-        val compactedSegments = new util.ArrayList[String]()
-        handleSegmentMerging(sqlContext,
-          carbonLoadModel
-            .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-          carbonTable,
-          compactedSegments,
-          operationContext)
-        carbonLoadModel.setMergedSegmentIds(compactedSegments)
-        writtenSegment
-      } catch {
-        case e: Exception =>
-          LOGGER.error(
-            "Auto-Compaction has failed. Ignoring this exception because the" +
-            " load is passed.", e)
+        try {
+          // compaction handling
+          if (carbonTable.isHivePartitionTable) {
+            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+          }
+          val compactedSegments = new util.ArrayList[String]()
+          handleSegmentMerging(sqlContext,
+            carbonLoadModel
+              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
+            carbonTable,
+            compactedSegments,
+            operationContext)
+          carbonLoadModel.setMergedSegmentIds(compactedSegments)
           writtenSegment
+        } catch {
+          case e: Exception =>
+            LOGGER.error(
+              "Auto-Compaction has failed. Ignoring this exception because the" +
+              " load is passed.", e)
+            writtenSegment
+        }
       }
+    } finally {
+      // Release the segment lock, once table status is finally updated
+      segmentLock.unlock()
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 61c25e9..86576fa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -274,19 +274,21 @@ object CarbonIndexUtil {
     if (isLoadToFailedSISegments && null != failedLoadMetaDataDetils) {
       val metadata = CarbonInternalLoaderUtil
         .getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath))
+      segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
+        .getSegmentToLoadStartTimeMapping(carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
+        .asScala
       failedLoadMetaDataDetils.asScala.foreach(loadMetaDetail => {
         // check whether this segment is valid or invalid, if it is present in the valid
list
-        // then don't consider it for reloading
-        if (!metadata.contains(loadMetaDetail.getLoadName)) {
+        // then don't consider it for reloading.
+        // Also main table should have this as a valid segment for reloading.
+        if (!metadata.contains(loadMetaDetail.getLoadName) &&
+            segmentIdToLoadStartTimeMapping.contains(loadMetaDetail.getLoadName)) {
           segmentsToReload.append(loadMetaDetail.getLoadName)
         }
       })
       LOGGER.info(
         s"SI segments to be reloaded for index table: ${
           indexTable.getTableUniqueName} are: ${segmentsToReload}")
-      segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
-        .getSegmentToLoadStartTimeMapping(carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
-        .asScala
     } else {
       segmentIdToLoadStartTimeMapping = scala.collection.mutable
         .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index ccb1e2a..f6a8835 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -235,22 +235,20 @@ public class CarbonInternalLoaderUtil {
    *
    */
   public static boolean updateLoadMetadataWithMergeStatus(CarbonTable indexCarbonTable,
-      String[] loadsToMerge, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
-      Map<String, String> segmentToLoadStartTimeMap, long mergeLoadStartTime,
-      SegmentStatus segmentStatus, long newLoadStartTime, List<String> rebuiltSegments)
-      throws IOException {
+      String[] loadsToMerge, String mergedLoadNumber, Map<String, String> segmentToLoadStartTimeMap,
+      long mergeLoadStartTime, SegmentStatus segmentStatus, long newLoadStartTime,
+      List<String> rebuiltSegments) throws IOException {
     boolean tableStatusUpdationStatus = false;
     List<String> loadMergeList = new ArrayList<>(Arrays.asList(loadsToMerge));
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier());
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
     try {
       if (carbonLock.lockWithRetries()) {
-        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() +
"."
-            + carbonLoadModel.getTableName() + " for table status updation ");
+        LOGGER.info("Acquired lock for the table " + indexCarbonTable.getDatabaseName() +
"."
+            + indexCarbonTable.getTableName() + " for table status updation ");
         LoadMetadataDetails[] loadDetails =
             SegmentStatusManager.readLoadMetadata(indexCarbonTable.getMetadataPath());
 
@@ -305,17 +303,17 @@ public class CarbonInternalLoaderUtil {
         tableStatusUpdationStatus = true;
       } else {
         LOGGER.error(
-            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName()
+ "."
-                + carbonLoadModel.getTableName() + "for table status updation");
+            "Could not able to obtain lock for table" + indexCarbonTable.getDatabaseName()
+ "."
+                + indexCarbonTable.getTableName() + "for table status updation");
       }
     } finally {
       if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
-            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+        LOGGER.info("Table unlocked successfully after table status updation" + indexCarbonTable
+            .getDatabaseName() + "." + indexCarbonTable.getTableName());
       } else {
         LOGGER.error(
-            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() +
"."
-                + carbonLoadModel.getTableName() + " during table status updation");
+            "Unable to unlock Table lock for table" + indexCarbonTable.getDatabaseName()
+ "."
+                + indexCarbonTable.getTableName() + " during table status updation");
       }
     }
     return tableStatusUpdationStatus;
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index e1f009d..39b3d94 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -85,7 +85,6 @@ object Compactor {
           indexCarbonTable,
           loadsToMerge,
           validSegments.head,
-          carbonLoadModel,
           segmentToSegmentTimestampMap,
           segmentIdToLoadStartTimeMapping(validSegments.head),
           SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
@@ -119,7 +118,6 @@ object Compactor {
           indexCarbonTable,
           loadsToMerge,
           validSegments.head,
-          carbonLoadModel,
           segmentToSegmentTimestampMap,
           segmentIdToLoadStartTimeMapping(validSegments.head),
           SegmentStatus.SUCCESS,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 298be7a..498c739 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -220,24 +220,32 @@ object SecondaryIndexUtil {
             segment
           }
 
-          val endTime = System.currentTimeMillis()
-          val loadMetadataDetails = SegmentStatusManager
-            .readLoadMetadata(indexCarbonTable.getMetadataPath)
-          loadMetadataDetails.foreach(loadMetadataDetail => {
-            if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-              loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
-              loadMetadataDetail.setLoadEndTime(endTime)
-              CarbonLoaderUtil
-                .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
-                  loadMetadataDetail.getLoadName,
-                  indexCarbonTable)
+          val statusLock =
+            new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
+          try {
+            if (statusLock.lockWithRetries()) {
+              val endTime = System.currentTimeMillis()
+              val loadMetadataDetails = SegmentStatusManager
+                .readLoadMetadata(indexCarbonTable.getMetadataPath)
+              loadMetadataDetails.foreach(loadMetadataDetail => {
+                if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
+                  loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
+                  loadMetadataDetail.setLoadEndTime(endTime)
+                  CarbonLoaderUtil
+                    .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
+                      loadMetadataDetail.getLoadName,
+                      indexCarbonTable)
+                }
+              })
+              SegmentStatusManager
+                .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
+                  loadMetadataDetails)
             }
-          })
-
-          SegmentStatusManager
-            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
-              loadMetadataDetails)
-
+          } finally {
+            if (statusLock != null) {
+              statusLock.unlock()
+            }
+          }
           // clear the indexSchema cache for the merged segments, as the index files and
           // data files are rewritten after compaction
           if (mergedSegments.size > 0) {
@@ -435,24 +443,32 @@ object SecondaryIndexUtil {
     val loadFolderDetailsArrayMainTable =
       SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetadataPath)
     indexTables.asScala.foreach { indexTable =>
-      val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath)
-      if (CarbonUtil.isFileExists(tableStatusFilePath)) {
-        val loadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(indexTable
-          .getMetadataPath);
-        if (null != loadFolderDetailsArray && loadFolderDetailsArray.nonEmpty) {
-          try {
-            SegmentStatusManager.writeLoadDetailsIntoFile(
-              CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath),
-              updateTimeStampForIndexTable(loadFolderDetailsArrayMainTable,
-                loadFolderDetailsArray))
-          } catch {
-            case ex: Exception =>
-              LOGGER.error(ex.getMessage);
+      val statusLock =
+        new SegmentStatusManager(indexTable.getAbsoluteTableIdentifier).getTableStatusLock
+      try {
+        if (statusLock.lockWithRetries()) {
+          val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath)
+          if (CarbonUtil.isFileExists(tableStatusFilePath)) {
+            val loadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(indexTable
+              .getMetadataPath);
+            if (null != loadFolderDetailsArray && loadFolderDetailsArray.nonEmpty)
{
+              SegmentStatusManager.writeLoadDetailsIntoFile(
+                CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath),
+                updateTimeStampForIndexTable(loadFolderDetailsArrayMainTable,
+                  loadFolderDetailsArray))
+            }
+          } else {
+            LOGGER.info(
+              "Table status file does not exist for index table: " + indexTable.getTableUniqueName)
           }
         }
-      } else {
-        LOGGER.info(
-          "Table status file does not exist for index table: " + indexTable.getTableUniqueName)
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex.getMessage);
+      } finally {
+        if (statusLock != null) {
+          statusLock.unlock()
+        }
       }
     }
   }


Mime
View raw message