carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent Load & Compaction operation
Date Wed, 30 Dec 2020 03:28:26 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f9027  [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable
after concurrent Load & Compaction operation
19f9027 is described below

commit 19f902785351f31be44fa89268d8a4b1854dc347
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Mon Dec 28 21:38:10 2020 +0530

    [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent
Load & Compaction operation
    
    Why is this PR needed?
    When Concurrent LOAD and COMPACTION is in progress on main table having SI, SILoadEventListenerForFailedSegments
listener is called to repair SI failed segments if any. It will compare SI and main table
segment status, if there is a mismatch, then it will add that specific load to failedLoads
to be re-loaded again.
    
    During Compaction, SI will be updated first and then maintable. So, in some cases, SI
segment will be in compacted state and main table will be in SUCCESS state(the compaction
can be still in progress or due to some operation failure). SI index repair will add those
segments to failedLoads, by checking if segment lock can be acquired. But, if maintable compaction
is finished by the time, SI repair comparison is done, then also, it can acquire segment lock
and add those load to failedL [...]
    
    What changes were proposed in this PR?
    Acquire compaction lock on maintable(to ensure compaction is not running), and then compare
SI and main table load details, to repair SI segments.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No (concurrent scenario)
    
    This closes #4067
---
 .../command/index/IndexRepairCommand.scala         |  42 +--
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 325 ++++++++++++---------
 .../SILoadEventListenerForFailedSegments.scala     |   7 +-
 3 files changed, 190 insertions(+), 184 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
index 43efba8..6bcea43 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -26,11 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 
 /**
@@ -73,26 +69,11 @@ extends DataCommand {
       .lookupRelation(Some(databaseName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation].carbonTable
 
-    val tableStatusLock = CarbonLockFactory
-      .getCarbonLockObj(mainCarbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
-      val carbonLoadModel = new CarbonLoadModel
-      carbonLoadModel.setDatabaseName(databaseName)
-      carbonLoadModel.setTableName(tableName)
-      carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
-    try {
-      if (tableStatusLock.lockWithRetries()) {
-        val tableStatusFilePath = CarbonTablePath
-          .getTableStatusFilePath(mainCarbonTable.getTablePath)
-        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
-          .readTableStatusFile(tableStatusFilePath).toList.asJava)
-        carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
-      } else {
-        throw new ConcurrentOperationException(mainCarbonTable.getDatabaseName,
-          mainCarbonTable.getTableName, "table status read", "reindex command")
-      }
-    } finally {
-      tableStatusLock.unlock()
-    }
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableName)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
     val indexMetadata = mainCarbonTable.getIndexMetadata
     val secondaryIndexProvider = IndexType.SI.getIndexProviderName
     if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
@@ -101,19 +82,11 @@ extends DataCommand {
         .get(secondaryIndexProvider).keySet().asScala
       // if there are no index tables for a given fact table do not perform any action
       if (indexTables.nonEmpty) {
-        val mainTableDetails = if (segments.isEmpty) {
-          carbonLoadModel.getLoadMetadataDetails.asScala.toList
-        } else {
-          // get segments for main table
-          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
-            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
-        }
         if (indexTableToRepair.isEmpty) {
           indexTables.foreach {
             indexTableName =>
               CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
-                indexMetadata, mainTableDetails, secondaryIndexProvider,
-                Integer.MAX_VALUE)(sparkSession)
+                indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE, segments)(sparkSession)
           }
         } else {
           val indexTablesToRepair = indexTables.filter(indexTable => indexTable
@@ -121,8 +94,7 @@ extends DataCommand {
           indexTablesToRepair.foreach {
             indexTableName =>
               CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
-                indexMetadata, mainTableDetails, secondaryIndexProvider,
-                Integer.MAX_VALUE)(sparkSession)
+                indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE, segments)(sparkSession)
           }
           if (indexTablesToRepair.isEmpty) {
             throw new Exception("Unable to find index table" + indexTableToRepair.get)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 85d8ffb..3f229b0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock,
LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
@@ -393,10 +394,10 @@ object CarbonIndexUtil {
   }
 
   def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
-    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
-      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String,
-                      repairLimit: Int)
-  (sparkSession: SparkSession) : Unit = {
+      carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      secondaryIndexProvider: String, repairLimit: Int,
+      segments: Option[List[String]] = Option.empty,
+      isLoadOrCompaction: Boolean = false)(sparkSession: SparkSession): Unit = {
     // when Si creation and load to main table are parallel, get the carbonTable from the
     // metastore which will have the latest index Info
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -406,158 +407,194 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction
lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case exception: Exception =>
+            if (!isLoadOrCompaction) {
+              throw exception
+            }
+            return;
+        }
+        carbonLoadModel.setLoadMetadataDetails(mainTableDetails.toList.asJava)
+        if (segments.isDefined) {
+          mainTableDetails = mainTableDetails.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+        if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+          mainTableDetails,
+          siTblLoadMetadataDetails)) {
+          val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+            indexTableName)
+          val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
+            indexMetadata.getParentTableName,
+            indexColumns.split(",").toList,
+            indexTableName)
+
+          // If it empty, then no need to do further computations because the
+          // tabletstatus might not have been created and hence next load will take care
+          if (siTblLoadMetadataDetails.isEmpty) {
+            Seq.empty
+          }
 
-      val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
-      .ArrayList[LoadMetadataDetails]()
-
-      // read the details of SI table and get all the failed segments during SI
-      // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
-      siTblLoadMetadataDetails.foreach {
-        case loadMetaDetail: LoadMetadataDetails =>
-          if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
-            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
-              loadMetaDetail.getLoadName) && repairLimit > failedLoadMetadataDetails.size()
) {
-            failedLoadMetadataDetails.add(loadMetaDetail)
-          } else if ((loadMetaDetail.getSegmentStatus ==
-            SegmentStatus.INSERT_IN_PROGRESS ||
-            loadMetaDetail.getSegmentStatus ==
-              SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
-            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
-              loadMetaDetail.getLoadName) && repairLimit > failedLoadMetadataDetails.size())
{
-            val segmentLock = CarbonLockFactory
-              .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
-                CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
-                  LockUsage.LOCK)
-            try {
-              if (segmentLock.lockWithRetries(1, 0)) {
-                LOGGER
-                  .info("SIFailedLoadListener: Acquired segment lock on segment:" +
-                    loadMetaDetail.getLoadName)
+          val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
+          .ArrayList[LoadMetadataDetails]()
+
+          // read the details of SI table and get all the failed segments during SI
+          // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+          siTblLoadMetadataDetails.foreach {
+            case loadMetaDetail: LoadMetadataDetails =>
+              val isMainTableLoadValid = checkIfMainTableLoadIsValid(mainTableDetails,
+                loadMetaDetail.getLoadName)
+              if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
+                  isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size())
{
                 failedLoadMetadataDetails.add(loadMetaDetail)
+              } else if ((loadMetaDetail.getSegmentStatus ==
+                          SegmentStatus.INSERT_IN_PROGRESS ||
+                          loadMetaDetail.getSegmentStatus ==
+                          SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+                         isMainTableLoadValid && repairLimit > failedLoadMetadataDetails.size())
{
+                val segmentLock = CarbonLockFactory
+                  .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
+                    CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
+                    LockUsage.LOCK)
+                try {
+                  if (segmentLock.lockWithRetries(1, 0)) {
+                    LOGGER
+                      .info("SIFailedLoadListener: Acquired segment lock on segment:" +
+                            loadMetaDetail.getLoadName)
+                    failedLoadMetadataDetails.add(loadMetaDetail)
+                  }
+                } finally {
+                  segmentLock.unlock()
+                  LOGGER
+                    .info("SIFailedLoadListener: Released segment lock on segment:" +
+                          loadMetaDetail.getLoadName)
+                }
               }
-            } finally {
-              segmentLock.unlock()
-              LOGGER
-                .info("SIFailedLoadListener: Released segment lock on segment:" +
-                  loadMetaDetail.getLoadName)
-            }
           }
-      }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for
SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail
!= null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName +
" for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = mainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI"
+
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail
!= null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName)
+
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI
"
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
+            })
+
+          try {
+            if (!failedLoadMetadataDetails.isEmpty) {
+              // in the case when in SI table a segment is deleted and it's entry is
+              // deleted from the tablestatus file, the corresponding .segment file from
+              // the metadata folder should also be deleted as it contains the
+              // mergefilename which does not exist anymore as the segment is deleted.
+              deleteStaleSegmentFileIfPresent(carbonLoadModel,
+                indexTable,
+                failedLoadMetadataDetails)
+              CarbonIndexUtil
+                .LoadToSITable(sparkSession,
+                  carbonLoadModel,
+                  indexTableName,
+                  isLoadToFailedSISegments = true,
+                  indexModel,
+                  carbonTable, indexTable, false, failedLoadMetadataDetails)
             }
-          }
-        })
-      try {
-        if (!failedLoadMetadataDetails.isEmpty) {
-          // in the case when in SI table a segment is deleted and it's entry is
-          // deleted from the tablestatus file, the corresponding .segment file from
-          // the metadata folder should also be deleted as it contains the
-          // mergefilename which does not exist anymore as the segment is deleted.
-          deleteStaleSegmentFileIfPresent(carbonLoadModel,
-            indexTable,
-            failedLoadMetadataDetails)
-          CarbonIndexUtil
-            .LoadToSITable(sparkSession,
-              carbonLoadModel,
-              indexTableName,
-              isLoadToFailedSISegments = true,
-              indexModel,
-              carbonTable, indexTable, false, failedLoadMetadataDetails)
-
-          // get the current load metadata details of the index table
-          // details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-        }
-
-        // get updated main table segments and si table segments
-        val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
 
-        // check if main table has load in progress and SI table has no load
-        // in progress entry, then no need to enable the SI table
-        // Only if the valid segments of maintable match the valid segments of SI
-        // table then we can enable the SI for query
-        if (CarbonInternalLoaderUtil
-          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
-            siTblLoadMetadataDetails)
-          && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
-          mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
-          // enable the SI table if it was disabled earlier due to failure during SI
-          // creation time
-          sparkSession.sql(
-            s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
-               |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+            // get updated main table segments and si table segments
+            val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+              SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+            val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+              SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+            // check if main table has load in progress and SI table has no load
+            // in progress entry, then no need to enable the SI table
+            // Only if the valid segments of maintable match the valid segments of SI
+            // table then we can enable the SI for query
+            if (CarbonInternalLoaderUtil
+                  .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+                    siTblLoadMetadataDetails)
+                && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+              mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
+              // enable the SI table if it was disabled earlier due to failure during SI
+              // creation time
+              sparkSession.sql(
+                s"""ALTER TABLE ${ carbonLoadModel.getDatabaseName }.$indexTableName SET
+                   |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+            }
+          } catch {
+            case ex: Exception =>
+              // in case of SI load only for for failed segments, catch the exception, but
+              // do not fail the main table load, as main table segments should be available
+              // for query
+              LOGGER.error(s"Load to SI table to $indexTableName is failed " +
+                           s"or SI table ENABLE is failed. ", ex)
+              Seq.empty
+          } finally {
+            segmentLocks.foreach {
+              segmentLock => segmentLock.unlock()
+            }
+          }
         }
-      } catch {
-        case ex: Exception =>
-          // in case of SI load only for for failed segments, catch the exception, but
-          // do not fail the main table load, as main table segments should be available
-          // for query
-          LOGGER.error(s"Load to SI table to $indexTableName is failed " +
-            s"or SI table ENABLE is failed. ", ex)
-          Seq.empty
-      } finally {
-        segmentLocks.foreach {
-          segmentLock => segmentLock.unlock()
+      } else {
+        LOGGER.error(s"Didn't check failed segments for index [$indexTableName] as compaction
" +
+                     s"is progress on ${ carbonTable.getTableUniqueName }. " +
+                     s"Please call SI repair again")
+        if (!isLoadOrCompaction) {
+          throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+            carbonTable.getTableName, "compaction", "reindex command")
         }
       }
+    } finally {
+      compactionLock.unlock()
     }
     Seq.empty
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index a1e6fc1..a7677ea 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{Event, OperationContext, OperationEventListener}
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
@@ -70,13 +69,11 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener
with L
               carbonLoadModel.getTableName + " are : " + maxSegmentRepairLimit)
             // if there are no index tables for a given fact table do not perform any action
             if (indexTables.nonEmpty) {
-              val mainTableDetails =
-                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
               indexTables.foreach {
                 indexTableName =>
                   CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
-                    indexMetadata, mainTableDetails.toList, secondaryIndexProvider,
-                    maxSegmentRepairLimit)(sparkSession)
+                    indexMetadata, secondaryIndexProvider,
+                    maxSegmentRepairLimit, isLoadOrCompaction = true)(sparkSession)
               }
             }
           }


Mime
View raw message