carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-3147] Fixed concurrent load issue
Date Tue, 11 Dec 2018 11:05:19 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master fd0885b03 -> f599cb114


[CARBONDATA-3147] Fixed concurrent load issue

Problem: During datamap commit, tablestatus_UUID files were being renamed to tablestatus due
to which any new in progress entry was being skipped and eventually during writing of success
status 'Entry not found exception was thrown'.

Solution: Instead of renaming the files, now we are reading both the files and merging the
contents. Then the updated details are written to the tablestatus file.

This closes #2977


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f599cb11
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f599cb11
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f599cb11

Branch: refs/heads/master
Commit: f599cb1140353004f4ca2fa7e6beb293dd515088
Parents: fd0885b
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Wed Dec 5 16:00:41 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Dec 11 16:35:03 2018 +0530

----------------------------------------------------------------------
 .../spark/rdd/AggregateDataMapCompactor.scala   |  69 ++++++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |   3 +-
 .../management/CarbonLoadDataCommand.scala      |   2 +-
 .../preaaggregate/PreAggregateListeners.scala   | 142 ++++++++++++++-----
 .../preaaggregate/PreAggregateUtil.scala        |   7 +-
 .../processing/util/CarbonLoaderUtil.java       |  10 +-
 6 files changed, 174 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 2119e4c..6d28295 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -23,11 +23,12 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonSession, SQLContext}
 import org.apache.spark.sql.execution.command.CompactionModel
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.execution.command.preaaggregate.{CommitPreAggregateListener,
PreAggregateUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.OperationContext
@@ -79,20 +80,32 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
           "true")
         loadCommand.processData(sqlContext.sparkSession)
-        val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
+        // After load is completed for child table the UUID table status will have 0.1 as
success
+        // and the table status file will have 0,1,2,3 as Success and 0.1 as In Progress.
+        // Therefore we will read the table status and write 0,1,2,3 as Compacted as the
commit
+        // listener will take care of merging the UUID and the table status files.
+        val newMetadataDetails = SegmentStatusManager.readLoadMetadata(
           carbonTable.getMetadataPath, uuid)
-        val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
-          case load if loadMetaDataDetails.contains(load) =>
-            load.setMergedLoadName(mergedLoadName)
-            load.setSegmentStatus(SegmentStatus.COMPACTED)
-            load.setModificationOrdeletionTimesStamp(System.currentTimeMillis())
-            load
-          case other => other
-        }
+        val mergedContent = loadMetaDataDetails.asScala.map {
+          segment => segment.setSegmentStatus(SegmentStatus.COMPACTED)
+            segment.setMergedLoadName(mergedLoadName)
+            segment.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime)
+            segment
+        } ++ newMetadataDetails
         SegmentStatusManager.writeLoadDetailsIntoFile(
           CarbonTablePath.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid),
-            updatedLoadMetaDataDetails)
-        carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
+          mergedContent.toArray)
+        carbonLoadModel.setLoadMetadataDetails((carbonLoadModel.getLoadMetadataDetails.asScala
++
+        newMetadataDetails).asJava)
+        // If isCompaction is true then it means that the compaction on aggregate table was
+        // triggered by the maintable thus no need to commit the tablestatus file but if
the
+        // compaction was triggered directly for aggregate table then commit has to be fired
as
+        // the commit listener would not be called.
+        val directAggregateCompactionCall = Option(operationContext
+          .getProperty("isCompaction")).getOrElse("false").toString.toBoolean
+        if (!directAggregateCompactionCall) {
+          commitAggregateTableStatus(carbonTable, uuid)
+        }
       } finally {
         // check if any other segments needs compaction on in case of MINOR_COMPACTION.
         // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
@@ -112,12 +125,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
         if (!compactionModel.compactionType.equals(CompactionType.MAJOR) &&
           !compactionModel.compactionType.equals(CompactionType.CUSTOM)) {
           if (!identifySegmentsToBeMerged().isEmpty) {
-            val uuidTableStaus = CarbonTablePath.getTableStatusFilePathWithUUID(
-              carbonTable.getTablePath, uuid)
-            val tableStatus = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
-            if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
-              FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
-            }
+            commitAggregateTableStatus(carbonTable, uuid)
             executeCompaction()
           }
         }
@@ -133,4 +141,27 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       }
     }
   }
+
+  /**
+   * Used to merge the contents of tablestatus and tablestatus_uuid files and write the new
+   * details to tablestatus file. For Example:-
+   * tablestatus contents are = 0(Success), 1(Success),2(Success),3(Success), 0.1(In Progress)
+   * tablestatus_uuid contents are = 0(Compacted), 1(Compacted),2(Compacted),3(Compacted),
0.1
+   * (Success).
+   *
+   * So after merging the tablestatus file will have: 0(Compacted), 1(Compacted),2(Compacted),
+   * 3(Compacted), 0.1(Success).
+   *
+   * NOTE: This method will be called when direct compaction is fired on child aggregate
table or
+   * when there are anymore segments to be compacted and the intermediate state of the
+   * tablestatus has to be committed for further compaction to pick other segments.
+   */
+  private def commitAggregateTableStatus(carbonTable: CarbonTable, uuid: String) {
+    if (!CommitPreAggregateListener.mergeTableStatusContents(carbonTable, CarbonTablePath
+      .getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid), CarbonTablePath
+      .getTableStatusFilePathWithUUID(carbonTable.getTablePath, ""))) {
+      throw new RuntimeException("Unable to acquire lock for table status updation")
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7e2e7d9..b07be72 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -461,7 +461,8 @@ object CarbonDataRDDFactory {
       }
       return null
     }
-    val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String]
+    val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
+      .asInstanceOf[String]
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
       // update the load entry in table status file for changing the status to marked for
delete
       CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index af406bb..6a8a9cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -349,7 +349,7 @@ case class CarbonLoadDataCommand(
       case ex: Exception =>
         LOGGER.error(ex)
         // update the load entry in table status file for changing the status to marked for
delete
-        if (isUpdateTableStatusRequired) {
+        if (isUpdateTableStatusRequired && !table.isChildDataMap) {
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
         }
         throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index f606c04..3038c93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -31,9 +31,11 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
@@ -69,16 +71,32 @@ trait CommitHelper {
   protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
       operationContext: OperationContext,
       carbonTable: CarbonTable): Unit = {
-    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
-    val segmentBeingLoaded =
-      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
-    val newDetails = loadMetaDataDetails.collect {
-      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
-        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
-        detail
-      case others => others
+    val lockFile: ICarbonLock = new SegmentStatusManager(carbonTable
+      .getAbsoluteTableIdentifier).getTableStatusLock
+    val retryCount = CarbonLockUtil
+      .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+        CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
+    val maxTimeout = CarbonLockUtil
+      .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+        CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
+    try {
+      if (lockFile.lockWithRetries(retryCount, maxTimeout)) {
+        val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+        val segmentBeingLoaded =
+          operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+        val newDetails = loadMetaDataDetails.collect {
+          case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+            detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+            detail
+          case others => others
+        }
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+      } else {
+        throw new RuntimeException("Uable to update table status file")
+      }
+    } finally {
+      lockFile.unlock()
     }
-    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
   }
 
   /**
@@ -111,6 +129,43 @@ trait CommitHelper {
     }
   }
 
+  def mergeTableStatusContents(carbonTable: CarbonTable, uuidTableStatusPath: String,
+      tableStatusPath: String): Boolean = {
+    val lockFile: ICarbonLock = new SegmentStatusManager(carbonTable
+      .getAbsoluteTableIdentifier).getTableStatusLock
+    try {
+      val retryCount = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
+      val maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
+      if (lockFile.lockWithRetries(retryCount, maxTimeout)) {
+        val tableStatusContents = SegmentStatusManager.readTableStatusFile(tableStatusPath)
+        val newLoadContent = SegmentStatusManager.readTableStatusFile(uuidTableStatusPath)
+        val mergedContent = tableStatusContents.collect {
+          case content =>
+            val contentIndex = newLoadContent.indexOf(content)
+            if (contentIndex == -1) {
+              content
+            } else {
+              newLoadContent(contentIndex)
+            }
+        }
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, mergedContent)
+        true
+      } else {
+        false
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("Exception occurred while merging files", ex)
+        false
+    } finally {
+      lockFile.unlock()
+    }
+  }
+
   /**
    * Used to remove table status files with UUID and segment folders.
    */
@@ -156,20 +211,19 @@ object AlterTableDropPartitionPostStatusListener extends OperationEventListener
             // Generate table status file name without UUID, forExample: tablestatus
             val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
               childCarbonTable.getTablePath)
-            renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+            mergeTableStatusContents(childCarbonTable, oldTableSchemaPath, newTableSchemaPath)
         }
         // if true then the commit for one of the child tables has failed
         val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
         if (commitFailed) {
-          LOGGER.info("Reverting table status file to original state")
-          renamedDataMaps.foreach {
-            command =>
-              val carbonTable = command.table
-              // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
-              val backupTableSchemaPath =
-                CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_"
+ uuid
-              val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
-              renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+          LOGGER.warn("Reverting table status file to original state")
+          childCommands.foreach {
+            childDropCommand =>
+              val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
+                childDropCommand.table.getTablePath)
+              markInProgressSegmentAsDeleted(tableStatusPath,
+                operationContext,
+                childDropCommand.table)
           }
         }
         commitFailed
@@ -200,7 +254,6 @@ object AlterTableDropPartitionMetaListener extends OperationEventListener{
     val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
     val parentCarbonTable = dropPartitionEvent.parentCarbonTable
     val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
-    val sparkSession = SparkSession.getActiveSession.get
     if (parentCarbonTable.hasAggregationDataMap) {
       // used as a flag to block direct drop partition on aggregate tables fired by the user
       operationContext.setProperty("isInternalDropCall", "true")
@@ -325,7 +378,7 @@ object CompactionProcessMetaListener extends OperationEventListener {
         childDataFrame,
         false,
         sparkSession)
-      val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+      val uuid = Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
       loadCommand.processMetadata(sparkSession)
       operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
       operationContext.setProperty("uuid", uuid)
@@ -460,7 +513,7 @@ object LoadPostAggregateListener extends OperationEventListener {
           .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
         // sorting the datamap for timeseries rollup
         val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
-        for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
+        val successDataMaps = sortedList.takeWhile { dataMapSchema =>
           val childLoadCommand = operationContext
             .getProperty(dataMapSchema.getChildSchema.getTableName)
             .asInstanceOf[CarbonLoadDataCommand]
@@ -493,9 +546,32 @@ object LoadPostAggregateListener extends OperationEventListener {
             isOverwrite,
             sparkSession)
         }
+        val loadFailed = successDataMaps.lengthCompare(sortedList.length) != 0
+        if (loadFailed) {
+          successDataMaps.foreach(dataMapSchema => markSuccessSegmentsAsFailed(operationContext
+            .getProperty(dataMapSchema.getChildSchema.getTableName)
+            .asInstanceOf[CarbonLoadDataCommand]))
+          throw new RuntimeException(
+            "Data Load failed for DataMap. Please check logs for the failure")
+        }
       }
     }
   }
+
+  private def markSuccessSegmentsAsFailed(childLoadCommand: CarbonLoadDataCommand) {
+    val segmentToRevert = childLoadCommand.operationContext
+      .getProperty(childLoadCommand.table.getTableUniqueName + "_Segment")
+    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
+      childLoadCommand.table.getTablePath)
+    val tableStatusContents = SegmentStatusManager.readTableStatusFile(tableStatusPath)
+    val updatedLoadDetails = tableStatusContents.collect {
+      case content if content.getLoadName == segmentToRevert =>
+        content.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+        content
+      case others => others
+    }
+    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, updatedLoadDetails)
+  }
 }
 
 /**
@@ -540,7 +616,7 @@ object CommitPreAggregateListener extends OperationEventListener with
CommitHelp
           operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
             .asInstanceOf[CarbonLoadDataCommand]
         }
-      }
+    }
     var commitFailed = false
     try {
       if (dataMapSchemas.nonEmpty) {
@@ -554,23 +630,19 @@ object CommitPreAggregateListener extends OperationEventListener with
CommitHelp
           // Generate table status file name without UUID, forExample: tablestatus
           val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
             childCarbonTable.getTablePath)
-          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+          mergeTableStatusContents(childCarbonTable, oldTableSchemaPath, newTableSchemaPath)
         }
         // if true then the commit for one of the child tables has failed
         commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
         if (commitFailed) {
           LOGGER.warn("Reverting table status file to original state")
-          renamedDataMaps.foreach {
-            loadCommand =>
-              val carbonTable = loadCommand.table
-              // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
-              val backupTableSchemaPath =
-                CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_"
+
-                uuid
-              val tableSchemaPath = CarbonTablePath
-                .getTableStatusFilePath(carbonTable.getTablePath)
-              markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, carbonTable)
-              renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+          childLoadCommands.foreach {
+            childLoadCommand =>
+              val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
+                childLoadCommand.table.getTablePath)
+              markInProgressSegmentAsDeleted(tableStatusPath,
+                operationContext,
+                childLoadCommand.table)
           }
         }
         // after success/failure of commit delete all tablestatus files with UUID in their
names.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index d16f570..0599fb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -587,7 +587,7 @@ object PreAggregateUtil {
       validateSegments: Boolean,
       loadCommand: CarbonLoadDataCommand,
       isOverwrite: Boolean,
-      sparkSession: SparkSession): Unit = {
+      sparkSession: SparkSession): Boolean = {
     CarbonSession.threadSet(
       CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
       parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "."
+
@@ -601,6 +601,11 @@ object PreAggregateUtil {
       "true")
     try {
       loadCommand.processData(sparkSession)
+      true
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("Data Load failed for DataMap: ", ex)
+        false
     } finally {
       CarbonSession.threadUnset(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f599cb11/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 2563768..64fcaa2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -339,8 +339,14 @@ public final class CarbonLoaderUtil {
           }
         }
 
-        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
-            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+        if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
!loadStartEntry
+            && !uuid.isEmpty() && segmentsToBeDeleted.isEmpty() &&
!insertOverwrite) {
+          SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath,
+              new LoadMetadataDetails[] { newMetaEntry });
+        } else {
+          SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+              .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+        }
         // Delete all old stale segment folders
         for (CarbonFile staleFolder : staleFolders) {
           // try block is inside for loop because even if there is failure in deletion of
1 stale


Mime
View raw message