carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2012] Add support to load pre-aggregate in one transaction
Date Thu, 01 Feb 2018 09:12:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master c9a02fc2a -> d680e9cf5


[CARBONDATA-2012] Add support to load pre-aggregate in one transaction

Current if a table(t1) has 2 preaggregate table(p1,p2) then while loading all the pre-aggregate tables are committed(table status writing) and then the parent table is committed.

After this PR the flow would be like this:

load t1
load p1
load p2
write table status for p2 with transactionID
write table status for p1 with transactionID
rename tablestatus_UUID to tablestatus for p2
rename tablestatus_UUID to tablestatus for p1
write table status for t1

This closes #1781


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d680e9cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d680e9cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d680e9cf

Branch: refs/heads/master
Commit: d680e9cf5016475e6e9b320c27be6503e1c6e66c
Parents: c9a02fc
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Mon Jan 15 14:35:56 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Feb 1 14:42:05 2018 +0530

----------------------------------------------------------------------
 .../datastore/filesystem/LocalCarbonFile.java   |   2 +-
 .../statusmanager/SegmentStatusManager.java     |  29 ++-
 .../core/util/path/CarbonTablePath.java         |   8 +
 .../hadoop/api/CarbonOutputCommitter.java       |   4 +
 .../carbondata/events/AlterTableEvents.scala    |  10 +
 .../spark/rdd/AggregateDataMapCompactor.scala   |  31 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  37 +++-
 .../spark/rdd/CarbonTableCompactor.scala        |  33 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +-
 .../management/CarbonLoadDataCommand.scala      |  25 ++-
 .../CreatePreAggregateTableCommand.scala        |   7 +-
 .../preaaggregate/PreAggregateListeners.scala   | 220 +++++++++++++++++--
 .../preaaggregate/PreAggregateUtil.scala        |  35 +--
 .../processing/loading/events/LoadEvents.java   |  13 ++
 .../processing/util/CarbonLoaderUtil.java       |  49 ++++-
 15 files changed, 431 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 4ce78be..5df5a81 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -233,7 +233,7 @@ public class LocalCarbonFile implements CarbonFile {
 
   @Override public boolean renameForce(String changetoName) {
     File destFile = new File(changetoName);
-    if (destFile.exists()) {
+    if (destFile.exists() && !file.getAbsolutePath().equals(destFile.getAbsolutePath())) {
       if (destFile.delete()) {
         return file.renameTo(new File(changetoName));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6af0304..01f810e 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -178,23 +178,42 @@ public class SegmentStatusManager {
    * @return
    */
   public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) {
+    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.LOADMETADATA_FILENAME;
+    return readTableStatusFile(metadataFileName);
+  }
+
+  /**
+   * Reads the table status file with the specified UUID if non empty.
+   */
+  public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String uuid) {
+    String tableStatusFileName;
+    if (uuid.isEmpty()) {
+      tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.LOADMETADATA_FILENAME;
+    } else {
+      tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.LOADMETADATA_FILENAME + CarbonCommonConstants.UNDERSCORE + uuid;
+    }
+    return readTableStatusFile(tableStatusFileName);
+  }
+
+  public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
-    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.LOADMETADATA_FILENAME;
     LoadMetadataDetails[] listOfLoadFolderDetailsArray;
     AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName));
+        new AtomicFileOperationsImpl(tableStatusPath, FileFactory.getFileType(tableStatusPath));
 
     try {
-      if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) {
+      if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
         return new LoadMetadataDetails[0];
       }
       dataInputStream = fileOperation.openForRead();
       inStream = new InputStreamReader(dataInputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
       buffReader = new BufferedReader(inStream);
       listOfLoadFolderDetailsArray =
           gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 9e66657..fab6289 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -252,6 +252,14 @@ public class CarbonTablePath extends Path {
     return getMetaDataDir() + File.separator + TABLE_STATUS_FILE;
   }
 
+  public String getTableStatusFilePathWithUUID(String uuid) {
+    if (!uuid.isEmpty()) {
+      return getTableStatusFilePath() + CarbonCommonConstants.UNDERSCORE + uuid;
+    } else {
+      return getTableStatusFilePath();
+    }
+  }
+
   /**
    * Gets absolute path of data file
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index f6e928d..9cca1bb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -115,8 +115,12 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
         LoadEvents.LoadTablePreStatusUpdateEvent event =
             new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
                 loadModel);
+        LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
+            new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
         try {
           OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext);
+          OperationListenerBus.getInstance().fireEvent(postStatusUpdateEvent,
+              (OperationContext) operationContext);
         } catch (Exception e) {
           throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 30e3f6f..ca1948a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -182,6 +182,16 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
     mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
 
 /**
+ * Compaction Event for handling post update status file operations, like committing child
+ * datamaps in one transaction
+ */
+case class AlterTableCompactionPostStatusUpdateEvent(
+    carbonTable: CarbonTable,
+    carbonMergerMapping: CarbonMergerMapping,
+    carbonLoadModel: CarbonLoadModel,
+    mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
+
+/**
  * Compaction Event for handling clean up in case of any compaction failure and abort the
  * operation, lister has to implement this event to handle failure scenarios
  *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 5f8f389..188e776 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.OperationContext
@@ -61,6 +62,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
       val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction")
         .asInstanceOf[CarbonLoadDataCommand]
+      val uuid = Option(loadCommand.operationContext.getProperty("uuid")).getOrElse("").toString
       try {
         val newInternalOptions = loadCommand.internalOptions ++
                                  Map("mergedSegmentName" -> mergedLoadName)
@@ -70,7 +72,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
                     sqlContext.sparkSession, loadCommand.logicalPlan.get))
         loadCommand.processData(sqlContext.sparkSession)
         val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
-          carbonTable.getMetaDataFilepath)
+          carbonTable.getMetaDataFilepath, uuid)
         val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
           case load if loadMetaDataDetails.contains(load) =>
             load.setMergedLoadName(mergedLoadName)
@@ -83,16 +85,37 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
           .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
             .getAbsoluteTableIdentifier)
         SegmentStatusManager
-          .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
+          .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid),
             updatedLoadMetaDataDetails)
         carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
       } finally {
         // check if any other segments needs compaction on in case of MINOR_COMPACTION.
         // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
         // allows it.
+        // Also as the load which will be fired for 2nd level compaction will read the
+        // tablestatus file and not the tablestatus_UUID therefore we have to commit the
+        // intermediate tablestatus file for 2nd level compaction to be successful.
+        // This is required because:
+        //  1. after doing 12 loads and a compaction after every 4 loads the table status file will
+        //     have 0.1, 4.1, 8, 9, 10, 11 as Success segments. While tablestatus_UUID will have
+        //     0.1, 4.1, 8.1.
+        //  2. Now for 2nd level compaction 0.1, 8.1, 4.1 have to be merged to 0.2. therefore we
+        //     need to read the tablestatus_UUID. But load flow should always read tablestatus file
+        //     because it contains the actual In-Process status for the segments.
+        //  3. If we read the tablestatus then 8, 9, 10, 11 will keep getting compacted into 8.1.
+        //  4. Therefore tablestatus file will be committed in between multiple commits.
         if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
-
-          executeCompaction()
+          if (!identifySegmentsToBeMerged().isEmpty) {
+            val carbonTablePath = CarbonStorePath
+              .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+                .getAbsoluteTableIdentifier)
+            val uuidTableStaus = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+            val tableStatus = carbonTablePath.getTableStatusFilePath
+            if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
+              FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
+            }
+            executeCompaction()
+          }
         }
         CarbonSession
           .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8212e85..3de0e70 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
@@ -62,7 +63,7 @@ import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
@@ -491,9 +492,10 @@ object CarbonDataRDDFactory {
       }
       return
     }
+    val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String]
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
       // update the load entry in table status file for changing the status to marked for delete
-      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+      CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
       LOGGER.info("********clean up done**********")
@@ -508,7 +510,7 @@ object CarbonDataRDDFactory {
           status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
           carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
         // update the load entry in table status file for changing the status to marked for delete
-        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
@@ -532,6 +534,8 @@ object CarbonDataRDDFactory {
       }
 
       writeDictionary(carbonLoadModel, result, writeAll = false)
+      operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
+        carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
         new LoadTablePreStatusUpdateEvent(
         carbonTable.getCarbonTableIdentifier,
@@ -543,9 +547,21 @@ object CarbonDataRDDFactory {
           carbonLoadModel,
           loadStatus,
           newEntryLoadStatus,
-          overwriteTable)
-      if (!done) {
-        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+          overwriteTable,
+          uniqueTableStatusId)
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      val commitComplete = try {
+        OperationListenerBus.getInstance()
+          .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+        true
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex, "Problem while committing data maps")
+          false
+      }
+      if (!done && !commitComplete) {
+        CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
@@ -731,7 +747,8 @@ object CarbonDataRDDFactory {
       operationContext: OperationContext): Unit = {
     LOGGER.info(s"compaction need status is" +
                 s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
-    if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
+    if (!carbonTable.isChildDataMap &&
+        CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
       LOGGER.audit(s"Compaction request received for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       val compactionSize = 0
@@ -805,7 +822,8 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       loadStatus: SegmentStatus,
       newEntryLoadStatus: SegmentStatus,
-      overwriteTable: Boolean): Boolean = {
+      overwriteTable: Boolean,
+      uuid: String = ""): Boolean = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
       status(0)._2._1
@@ -820,7 +838,7 @@ object CarbonDataRDDFactory {
     CarbonLoaderUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
     val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
-      overwriteTable)
+      overwriteTable, uuid)
     if (!done) {
       val errorMessage = s"Dataload failed due to failure in table status updation for" +
                          s" ${carbonLoadModel.getTableName}"
@@ -835,7 +853,6 @@ object CarbonDataRDDFactory {
     done
   }
 
-
   /**
    * repartition the input data for partition table.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a0c8f65..8406d8d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -32,10 +32,12 @@ import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -245,8 +247,33 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         CarbonDataMergerUtil
           .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
             mergedLoadNumber, carbonLoadModel, compactionType)
-
-      if (!statusFileUpdation) {
+      val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
+        carbonMergerMapping,
+        carbonLoadModel,
+        mergedLoadName)
+      // Used to inform the commit listener that the commit is fired from compaction flow.
+      operationContext.setProperty("isCompaction", "true")
+      val commitComplete = try {
+        // Once main table compaction is done and 0.1, 4.1, 8.1 is created commit will happen for
+        // all the tables. The commit listener will compact the child tables until no more segments
+        // are left. But 2nd level compaction is yet to happen on the main table therefore again the
+        // compaction flow will try to commit the child tables which is wrong. This check tell the
+        // 2nd level compaction flow that the commit for datamaps is already done.
+        val isCommitDone = operationContext.getProperty("commitComplete")
+        if (isCommitDone != null) {
+          isCommitDone.toString.toBoolean
+        } else {
+          OperationListenerBus.getInstance()
+            .fireEvent(compactionLoadStatusPostEvent, operationContext)
+          true
+        }
+      } catch {
+        case ex: Exception =>
+          LOGGER.error(ex, "Problem while committing data maps")
+          false
+      }
+      operationContext.setProperty("commitComplete", commitComplete)
+      if (!statusFileUpdation && !commitComplete) {
         LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 870b1f3..40035ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -148,6 +148,8 @@ object CarbonEnv {
         AlterPreAggregateTableCompactionPostListener)
       .addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener)
       .addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener)
+      .addListener(classOf[LoadTablePostStatusUpdateEvent], CommitPreAggregateListener)
+      .addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 226a625..8e6c20e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.management
 
 import java.text.SimpleDateFormat
 import java.util
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -35,8 +36,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
@@ -119,6 +120,7 @@ case class CarbonLoadDataCommand(
     }
     Seq.empty
   }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
@@ -176,7 +178,18 @@ case class CarbonLoadDataCommand(
       LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
       TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
       var isUpdateTableStatusRequired = false
+      // if the table is child then extract the uuid from the operation context and the parent would
+      // already generated UUID.
+      // if parent table then generate a new UUID else use empty.
+      val uuid = if (table.isChildDataMap) {
+        Option(operationContext.getProperty("uuid")).getOrElse("").toString
+      } else if (table.hasAggregationDataMap) {
+        UUID.randomUUID().toString
+      } else {
+        ""
+      }
       try {
+        operationContext.setProperty("uuid", uuid)
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
           new LoadTablePreExecutionEvent(
             table.getCarbonTableIdentifier,
@@ -194,7 +207,9 @@ case class CarbonLoadDataCommand(
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
-          CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
+          CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+            carbonLoadModel,
+            isOverwriteTable)
           isUpdateTableStatusRequired = true
         }
         if (isOverwriteTable) {
@@ -252,7 +267,7 @@ case class CarbonLoadDataCommand(
         case CausedBy(ex: NoRetryException) =>
           // update the load entry in table status file for changing the status to marked for delete
           if (isUpdateTableStatusRequired) {
-            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
           }
           LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
           throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
@@ -263,7 +278,7 @@ case class CarbonLoadDataCommand(
           LOGGER.error(ex)
           // update the load entry in table status file for changing the status to marked for delete
           if (isUpdateTableStatusRequired) {
-            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
+            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
           }
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index dbbf90c..3de75c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -205,11 +205,12 @@ case class CreatePreAggregateTableCommand(
       loadCommand.dataFrame = Some(PreAggregateUtil
         .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
       PreAggregateUtil.startDataLoadForDataMap(
-        parentTable,
+        TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)),
         segmentToLoad = "*",
         validateSegments = true,
-        sparkSession,
-        loadCommand)
+        loadCommand,
+        isOverwrite = false,
+        sparkSession)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 7b273ba..ed6be97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -17,19 +17,26 @@
 
 package org.apache.spark.sql.execution.command.preaaggregate
 
+import java.util.UUID
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.{SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
-import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 
 /**
  * below class will be used to create load command for compaction
@@ -71,9 +78,13 @@ object CompactionProcessMetaListener extends OperationEventListener {
           childDataFrame,
           false,
           sparkSession)
+        val uuid = Option(operationContext.getProperty("uuid")).
+          getOrElse(UUID.randomUUID()).toString
+        operationContext.setProperty("uuid", uuid)
         loadCommand.processMetadata(sparkSession)
         operationContext
           .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand)
+        loadCommand.operationContext = operationContext
       }
     } else if (table.isChildDataMap) {
       val childTableName = table.getTableName
@@ -95,9 +106,13 @@ object CompactionProcessMetaListener extends OperationEventListener {
         childDataFrame,
         false,
         sparkSession)
+      val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
       loadCommand.processMetadata(sparkSession)
       operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
+      operationContext.setProperty("uuid", uuid)
+      loadCommand.operationContext = operationContext
     }
+
   }
 }
 
@@ -127,12 +142,17 @@ object LoadProcessMetaListener extends OperationEventListener {
         val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
         val parentTableName = table.getTableName
         val databaseName = table.getDatabaseName
+        // if the table is child then extract the uuid from the operation context and the parent
+        // would already generated UUID.
+        // if parent table then generate a new UUID else use empty.
+        val uuid =
+          Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
         val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
         for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
           val childTableName = dataMapSchema.getRelationIdentifier.getTableName
           val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
           val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) {
-            PreAggregateUtil.getChildQuery(dataMapSchema)
+            (PreAggregateUtil.getChildQuery(dataMapSchema), "")
           } else {
             // for timeseries rollup policy
             val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
@@ -140,18 +160,19 @@ object LoadProcessMetaListener extends OperationEventListener {
             list += dataMapSchema
             // if non of the rollup data map is selected hit the maintable and prepare query
             if (tableSelectedForRollup.isEmpty) {
-              PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
+              (PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
                 parentTableName,
-                databaseName)
+                databaseName), "")
             } else {
               // otherwise hit the select rollup datamap schema
-              PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
+              (PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
                 tableSelectedForRollup.get,
-                databaseName)
+                databaseName),
+                s"$databaseName.${tableSelectedForRollup.get.getChildSchema.getTableName}")
             }
           }
           val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
-            childSelectQuery)).drop("preAggLoad")
+            childSelectQuery._1)).drop("preAggLoad")
           val isOverwrite =
             operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
           val loadCommand = PreAggregateUtil.createLoadCommandForChild(
@@ -159,7 +180,10 @@ object LoadProcessMetaListener extends OperationEventListener {
             TableIdentifier(childTableName, Some(childDatabaseName)),
             childDataFrame,
             isOverwrite,
-            sparkSession)
+            sparkSession,
+            timeseriesParentTableName = childSelectQuery._2)
+          operationContext.setProperty("uuid", uuid)
+          loadCommand.operationContext.setProperty("uuid", uuid)
           loadCommand.processMetadata(sparkSession)
           operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand)
         }
@@ -191,25 +215,172 @@ object LoadPostAggregateListener extends OperationEventListener {
           .asInstanceOf[CarbonLoadDataCommand]
         childLoadCommand.dataFrame = Some(PreAggregateUtil
           .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
-        val childOperationContext = new OperationContext
-        childOperationContext
-          .setProperty(dataMapSchema.getChildSchema.getTableName,
-            operationContext.getProperty(dataMapSchema.getChildSchema.getTableName))
         val isOverwrite =
           operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
-        childOperationContext.setProperty("isOverwrite", isOverwrite)
-        childOperationContext.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction",
-          operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction"))
-        childLoadCommand.operationContext = childOperationContext
+        childLoadCommand.operationContext = operationContext
+        val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent")
+        val (parentTableIdentifier, segmentToLoad) =
+          if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) {
+            val (parentTableDatabase, parentTableName) =
+              (timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1))
+            (TableIdentifier(parentTableName, Some(parentTableDatabase)),
+            operationContext.getProperty(
+              s"${parentTableDatabase}_${parentTableName}_Segment").toString)
+        } else {
+            (TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
+              carbonLoadModel.getSegmentId)
+        }
         PreAggregateUtil.startDataLoadForDataMap(
-            table,
-            carbonLoadModel.getSegmentId,
+            parentTableIdentifier,
+            segmentToLoad,
             validateSegments = false,
-            sparkSession,
-          childLoadCommand)
+            childLoadCommand,
+            isOverwrite,
+            sparkSession)
+        }
+      }
+    }
+}
+
+/**
+ * This listener is used to commit all the child data aggregate tables in one transaction. If one
+ * failes all will be reverted to original state.
+ */
+object CommitPreAggregateListener extends OperationEventListener {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override protected def onEvent(event: Event,
+      operationContext: OperationContext): Unit = {
+    // The same listener is called for both compaction and load therefore getting the
+    // carbonLoadModel from the appropriate event.
+    val carbonLoadModel = event match {
+      case loadEvent: LoadTablePostStatusUpdateEvent =>
+        loadEvent.getCarbonLoadModel
+      case compactionEvent: AlterTableCompactionPostStatusUpdateEvent =>
+        compactionEvent.carbonLoadModel
+    }
+    val isCompactionFlow = Option(
+      operationContext.getProperty("isCompaction")).getOrElse("false").toString.toBoolean
+    val dataMapSchemas =
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getDataMapSchemaList
+    // extract all child LoadCommands
+    val childLoadCommands = if (!isCompactionFlow) {
+      // If not compaction flow then the key for load commands will be tableName
+        dataMapSchemas.asScala.map { dataMapSchema =>
+          operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)
+            .asInstanceOf[CarbonLoadDataCommand]
+        }
+      } else {
+      // If not compaction flow then the key for load commands will be tableName_Compaction
+        dataMapSchemas.asScala.map { dataMapSchema =>
+          operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
+            .asInstanceOf[CarbonLoadDataCommand]
+        }
+      }
+     if (dataMapSchemas.size() > 0) {
+       val uuid = operationContext.getProperty("uuid").toString
+      // keep committing until one fails
+      val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
+        val childCarbonTable = childLoadCommand.table
+        val carbonTablePath =
+          new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier,
+            childCarbonTable.getTablePath)
+        // Generate table status file name with UUID, forExample: tablestatus_1
+        val oldTableSchemaPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+        // Generate table status file name without UUID, forExample: tablestatus
+        val newTableSchemaPath = carbonTablePath.getTableStatusFilePath
+        renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+      }
+      // if true then the commit for one of the child tables has failed
+      val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.size()) != 0
+      if (commitFailed) {
+        LOGGER.warn("Reverting table status file to original state")
+        renamedDataMaps.foreach {
+          loadCommand =>
+            val carbonTable = loadCommand.table
+            val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+              carbonTable.getTablePath)
+            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+            val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath + "_backup_" + uuid
+            val tableSchemaPath = carbonTablePath.getTableStatusFilePath
+            markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, loadCommand)
+            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
         }
       }
+      // after success/failure of commit delete all tablestatus files with UUID in their names.
+      // if commit failed then remove the segment directory
+      cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
+        operationContext,
+        uuid)
+      if (commitFailed) {
+        sys.error("Failed to update table status for pre-aggregate table")
+      }
+    }
+
+
+  }
+
+  private def markInProgressSegmentAsDeleted(tableStatusFile: String,
+      operationContext: OperationContext,
+      loadDataCommand: CarbonLoadDataCommand): Unit = {
+    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+    val segmentBeingLoaded =
+      operationContext.getProperty(loadDataCommand.table.getTableUniqueName + "_Segment").toString
+    val newDetails = loadMetaDataDetails.collect {
+      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+        detail
+      case others => others
+    }
+    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+  }
+
+  /**
+   *  Used to rename table status files for commit operation.
+   */
+  private def renameDataMapTableStatusFiles(sourceFileName: String,
+      destinationFileName: String, uuid: String) = {
+    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+      val backUpPostFix = if (uuid.nonEmpty) {
+        "_backup_" + uuid
+      } else {
+        ""
+      }
+      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+        oldCarbonFile.renameForce(destinationFileName)
+      } else {
+        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+        false
+      }
+    } else {
+      false
     }
+  }
+
+  /**
+   * Used to remove table status files with UUID and segment folders.
+   */
+  private def cleanUpStaleTableStatusFiles(
+      childTables: Seq[CarbonTable],
+      operationContext: OperationContext,
+      uuid: String): Unit = {
+    childTables.foreach { childTable =>
+      val carbonTablePath = new CarbonTablePath(childTable.getCarbonTableIdentifier,
+        childTable.getTablePath)
+      val metaDataDir = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath)
+      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.contains(uuid) || file.getName.contains("backup")
+        }
+      })
+      tableStatusFiles.foreach(_.delete())
+    }
+  }
 }
 
 /**
@@ -226,6 +397,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
     val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
     val carbonTable = compactionEvent.carbonTable
     val compactionType = compactionEvent.carbonMergerMapping.campactionType
+    val carbonLoadModel = compactionEvent.carbonLoadModel
     val sparkSession = compactionEvent.sparkSession
     if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
@@ -236,6 +408,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
           compactionType.toString,
           Some(System.currentTimeMillis()),
           "")
+        operationContext.setProperty(
+          dataMapSchema.getRelationIdentifier.getDatabaseName + "_" +
+          dataMapSchema.getRelationIdentifier.getTableName + "_Segment",
+          carbonLoadModel.getSegmentId)
         CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
           .run(sparkSession)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index dac5d5e..1d4ebec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -35,13 +35,16 @@ import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -581,32 +584,33 @@ object PreAggregateUtil {
    * This method will start load process on the data map
    */
   def startDataLoadForDataMap(
-      parentCarbonTable: CarbonTable,
+      parentTableIdentifier: TableIdentifier,
       segmentToLoad: String,
       validateSegments: Boolean,
-      sparkSession: SparkSession,
-      loadCommand: CarbonLoadDataCommand): Unit = {
+      loadCommand: CarbonLoadDataCommand,
+      isOverwrite: Boolean,
+      sparkSession: SparkSession): Unit = {
     CarbonSession.threadSet(
       CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-      parentCarbonTable.getDatabaseName + "." +
-      parentCarbonTable.getTableName,
+      parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+      parentTableIdentifier.table,
       segmentToLoad)
     CarbonSession.threadSet(
       CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-      parentCarbonTable.getDatabaseName + "." +
-      parentCarbonTable.getTableName, validateSegments.toString)
+      parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+      parentTableIdentifier.table, validateSegments.toString)
     CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
     try {
       loadCommand.processData(sparkSession)
     } finally {
       CarbonSession.threadUnset(
         CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-        parentCarbonTable.getDatabaseName + "." +
-        parentCarbonTable.getTableName)
+        parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+        parentTableIdentifier.table)
       CarbonSession.threadUnset(
         CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-        parentCarbonTable.getDatabaseName + "." +
-        parentCarbonTable.getTableName)
+        parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
+        parentTableIdentifier.table)
     }
   }
 
@@ -885,7 +889,8 @@ object PreAggregateUtil {
       dataMapIdentifier: TableIdentifier,
       dataFrame: DataFrame,
       isOverwrite: Boolean,
-      sparkSession: SparkSession): CarbonLoadDataCommand = {
+      sparkSession: SparkSession,
+      timeseriesParentTableName: String = ""): CarbonLoadDataCommand = {
     val headers = columns.asScala.filter { column =>
       !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
     }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
@@ -896,7 +901,8 @@ object PreAggregateUtil {
       Map("fileheader" -> headers),
       isOverwriteTable = isOverwrite,
       dataFrame = None,
-      internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
+      internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
+        "timeseriesParent" -> timeseriesParentTableName),
       logicalPlan = Some(dataFrame.queryExecution.logical))
     loadCommand
   }
@@ -904,4 +910,5 @@ object PreAggregateUtil {
   def getDataFrame(sparkSession: SparkSession, child: LogicalPlan): DataFrame = {
     Dataset.ofRows(sparkSession, child)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 78964e7..190c72c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -147,6 +147,19 @@ public class LoadEvents {
       return carbonTable;
     }
   }
+
+  public static class LoadTablePostStatusUpdateEvent extends Event {
+    private CarbonLoadModel carbonLoadModel;
+
+    public LoadTablePostStatusUpdateEvent(CarbonLoadModel carbonLoadModel) {
+      this.carbonLoadModel = carbonLoadModel;
+    }
+
+    public CarbonLoadModel getCarbonLoadModel() {
+      return carbonLoadModel;
+    }
+  }
+
   /**
    * Class for handling clean up in case of any failure and abort the operation.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 12fc5c1..3a83427 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -150,6 +150,22 @@ public final class CarbonLoaderUtil {
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
       throws IOException {
+    return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, "");
+  }
+
+  /**
+   * This API will write the load level metadata for the loadmanagement module inorder to
+   * manage the load and query execution management smoothly.
+   *
+   * @param newMetaEntry
+   * @param loadModel
+   * @param uuid
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
+      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
+      throws IOException {
     boolean status = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -159,7 +175,12 @@ public final class CarbonLoaderUtil {
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType);
     }
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    String tableStatusPath;
+    if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
+      tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+    } else {
+      tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    }
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil
@@ -314,7 +335,6 @@ public final class CarbonLoaderUtil {
         new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
 
     try {
-
       dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
               Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
@@ -367,7 +387,7 @@ public final class CarbonLoaderUtil {
 
 
   public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
-      boolean insertOverwrite) throws IOException {
+      boolean insertOverwrite, String uuid) throws IOException {
     LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails();
     SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS;
     if (insertOverwrite) {
@@ -381,18 +401,23 @@ public final class CarbonLoaderUtil {
     }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
-    boolean entryAdded =
-        CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
+    boolean entryAdded = CarbonLoaderUtil
+        .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
     if (!entryAdded) {
       throw new IOException("Dataload failed due to failure in table status updation for "
           + model.getTableName());
     }
   }
 
+  public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
+      boolean insertOverwrite) throws IOException {
+    readAndUpdateLoadProgressInTableMeta(model, insertOverwrite, "");
+  }
+
   /**
    * This method will update the load failure entry in the table status file
    */
-  public static void updateTableStatusForFailure(CarbonLoadModel model)
+  public static void updateTableStatusForFailure(CarbonLoadModel model, String uuid)
       throws IOException {
     // in case if failure the load status should be "Marked for delete" so that it will be taken
     // care during clean up
@@ -404,14 +429,22 @@ public final class CarbonLoaderUtil {
     }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
-    boolean entryAdded =
-        CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false);
+    boolean entryAdded = CarbonLoaderUtil.recordNewLoadMetadata(
+        loadMetaEntry, model, false, false, uuid);
     if (!entryAdded) {
       throw new IOException(
           "Failed to update failure entry in table status for " + model.getTableName());
     }
   }
 
+  /**
+   * This method will update the load failure entry in the table status file with empty uuid.
+   */
+  public static void updateTableStatusForFailure(CarbonLoadModel model)
+      throws IOException {
+    updateTableStatusForFailure(model, "");
+  }
+
   public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier)
       throws IOException {
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =


Mime
View raw message