carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [13/22] incubator-carbondata git commit: IUD horizontal compaction of update and delete delta files support
Date Fri, 06 Jan 2017 13:57:13 GMT
IUD horizontal compaction of update and delete delta files support


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f9fb1b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f9fb1b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f9fb1b91

Branch: refs/heads/master
Commit: f9fb1b91eb175cd8d812148c574e3e5bf224b7be
Parents: 3e045d8
Author: sounakr <sounakr@gmail.com>
Authored: Mon Jan 2 11:28:35 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../spark/merger/CarbonDataMergerUtil.java      | 1019 +++++++++++++++++-
 .../merger/CarbonDataMergerUtilResult.java      |   12 +
 .../carbondata/spark/merger/CompactionType.java |   10 +-
 .../spark/merger/RowResultMerger.java           |   64 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |    3 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |    7 +-
 .../spark/rdd/DataManagementFunc.scala          |  112 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   10 +-
 .../sql/execution/command/IUDCommands.scala     |  816 +++++++++++++-
 .../CompactionSystemLockFeatureTest.scala       |   23 +-
 .../fileoperations/AtomicFileOperations.java    |   33 -
 .../AtomicFileOperationsImpl.java               |   87 --
 .../lcm/fileoperations/FileWriteOperation.java  |   25 -
 .../lcm/locks/AbstractCarbonLock.java           |   77 --
 .../carbondata/lcm/locks/CarbonLockFactory.java |   94 --
 .../carbondata/lcm/locks/CarbonLockUtil.java    |   64 --
 .../carbondata/lcm/locks/HdfsFileLock.java      |  120 ---
 .../carbondata/lcm/locks/ICarbonLock.java       |   40 -
 .../carbondata/lcm/locks/LocalFileLock.java     |  164 ---
 .../apache/carbondata/lcm/locks/LockUsage.java  |   35 -
 .../carbondata/lcm/locks/ZooKeeperLocking.java  |  195 ----
 .../carbondata/lcm/locks/ZookeeperInit.java     |   82 --
 .../lcm/status/SegmentStatusManager.java        |  593 ----------
 23 files changed, 2017 insertions(+), 1668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 41709f7..1392bc9 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -41,12 +42,26 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
+import org.apache.carbondata.locks.CarbonLockFactory;
+import org.apache.carbondata.locks.ICarbonLock;
+import org.apache.carbondata.locks.LockUsage;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.spark.load.CarbonLoaderUtil;
 
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
 /**
  * utility class for load merging.
  */
@@ -122,14 +137,400 @@ public final class CarbonDataMergerUtil {
 
   }
 
+
+  /**
+   * Form the Name of the New Merge Folder
+   *
+   * @param segmentToBeMerged
+   * @return
+   */
+  public static String getMergedLoadName(final String segmentToBeMerged) {
+    String firstSegmentName = segmentToBeMerged;
+    if (firstSegmentName.contains(".")) {
+      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
+      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
+      int fraction = Integer.parseInt(afterDecimal) + 1;
+      String mergedSegmentName = beforeDecimal + "." + fraction;
+      return mergedSegmentName;
+    } else {
+      String mergeName = firstSegmentName + "." + 1;
+      return mergeName;
+    }
+
+  }
+  
+  
+  /**
+   * Update the Table Status file After IUD update delta compaction.
+   * @param loadsToMerge
+   * @param mergeList
+   * @param metaDataFilepath
+   * @param MergedLoadName
+   * @param carbonLoadModel
+   * @return
+   */
+
+  public static boolean updateLoadMetadataIUDFactFileCompaction(
+      List<LoadMetadataDetails> loadsToMerge, List<String> mergeList, String metaDataFilepath,
+      String MergedLoadName, CarbonLoadModel carbonLoadModel) {
+
+    boolean tableStatusUpdationStatus = false;
+
+    Map<String, String> mergeMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    if (mergeList == null) return false;
+
+    for (String mergeElem : mergeList) {
+      String[] strArray = mergeElem.split(",");
+      String orgSegmentname = strArray[0];
+      String mergedSegmentname = strArray[1];
+      mergeMap.put(orgSegmentname, mergedSegmentname);
+    }
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+            + carbonLoadModel.getTableName() + " for table status updation ");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+
+        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String mergedLoadNumber = MergedLoadName.substring(
+            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+
+        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
+        for (LoadMetadataDetails loadDetail : loadDetails) {
+          // check if this segment is merged.
+          if (loadsToMerge.contains(loadDetail)) {
+            // if the compacted load is deleted after the start of the compaction process,
+            // then need to discard the compaction process and treat it as failed compaction.
+            if (loadDetail.getLoadStatus()
+                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+                  + " is deleted after the compaction is started.");
+              return tableStatusUpdationStatus;
+            }
+            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
+            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+            loadDetail.setMergedLoadName(mergeMap.get(loadDetail.getLoadName()));
+          }
+        }
+        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
+
+        // create entry for merged one.
+        for (String mergedList : mergeList) {
+          LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+          loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+          long loadEnddate = CarbonUpdateUtil.readCurrentTime();
+          loadMetadataDetails.setLoadEndTime(loadEnddate);
+          loadMetadataDetails.setLoadName(mergeMap.get(mergedList.split(",")[0]));
+          loadMetadataDetails
+              .setLoadStartTime(Long.parseLong(carbonLoadModel.getFactTimeStamp()));
+          loadMetadataDetails.setPartitionCount("0");
+          updatedDetailsList.add(loadMetadataDetails);
+        }
+
+        // put the merged folder entry
+        try {
+          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+          tableStatusUpdationStatus = true;
+        } catch (IOException e) {
+          LOGGER.error("Error while writing metadata");
+        }
+      } else {
+        LOGGER.error(
+            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + "for table status updation");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + " during table status updation");
+      }
+    }
+    return tableStatusUpdationStatus;
+
+  }
+
+  /**
+   * Update Both Segment Update Status and Table Status for the case of IUD Delete
+   * delta compaction.
+   *
+   * @param loadsToMerge
+   * @param metaDataFilepath
+   * @param carbonLoadModel
+   * @return
+   */
+  public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
+      List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
+      CarbonLoadModel carbonLoadModel) {
+
+    boolean status = false;
+    boolean updateLockStatus = false;
+    boolean tableLockStatus = false;
+
+    String timestamp = carbonLoadModel.getFactTimeStamp();
+
+    List<String> updatedDeltaFilesList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update
+    // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with
+    // Table Status Metadata file (For Update Block Compaction) it has to update the
+    // Table Update Status Metadata File (For corresponding Delete Delta File).
+    // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in
+    // A) Table Update Status Metadata File (Block Level)
+    //      * For each blocks which is being compacted Mark 'Compacted' as the Status.
+    // B) Table Status Metadata file (Segment Level)
+    //      * loadStatus won't be changed to "compacted'
+    //      * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current
+    //        timestamp (which is being passed from driver)
+    // First the Table Update Status Metadata File should be updated as we need to get
+    // the updated blocks for the segment from Table Status Metadata Update Delta Start and
+    // End Timestamp.
+
+    // Table Update Status Metadata Update.
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
+    ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
+
+    // Update the Compacted Blocks with Compacted Status.
+    try {
+      updatedDeltaFilesList = segmentUpdateStatusManager
+          .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName().toString());
+    } catch (Exception e) {
+      LOGGER.error("Error while getting the Update Delta Blocks.");
+      status = false;
+      return status;
+    }
+
+    if (updatedDeltaFilesList.size() > 0) {
+      try {
+        updateLockStatus = updateLock.lockWithRetries();
+        tableLockStatus = statusLock.lockWithRetries();
+
+        List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size());
+
+        for (String compactedBlocks : updatedDeltaFilesList) {
+          // Try to BlockName
+          String fullBlock = compactedBlocks;
+          int endIndex = fullBlock.lastIndexOf(File.separator);
+          String blkNoExt = fullBlock.substring(endIndex + 1, fullBlock.lastIndexOf("-"));
+          blockNames.add(blkNoExt);
+        }
+
+        if (updateLockStatus && tableLockStatus) {
+
+          SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager
+              .readLoadMetadata();
+
+          for (String compactedBlocks : blockNames) {
+            // Check is the compactedBlocks name matches with oldDetails
+            for (int i = 0; i < updateLists.length; i++) {
+              if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks)
+                  && !CarbonCommonConstants.COMPACTED.equalsIgnoreCase(updateLists[i].getStatus())
+                  && !CarbonCommonConstants.MARKED_FOR_DELETE
+                  .equalsIgnoreCase(updateLists[i].getStatus())) {
+                updateLists[i].setStatus(CarbonCommonConstants.COMPACTED);
+              }
+            }
+          }
+
+          LoadMetadataDetails[] loadDetails =
+              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+          for (LoadMetadataDetails loadDetail : loadDetails) {
+            if (loadsToMerge.contains(loadDetail)) {
+              loadDetail.setUpdateDeltaStartTimestamp(timestamp);
+              loadDetail.setUpdateDeltaEndTimestamp(timestamp);
+              if (loadDetail.getLoadName().equalsIgnoreCase("0")) {
+                loadDetail
+                    .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+              }
+            }
+          }
+
+          try {
+            segmentUpdateStatusManager
+                .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
+            segmentStatusManager
+                .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+            status = true;
+          } catch (IOException e) {
+            LOGGER.error(
+                "Error while writing metadata. The metadata file path is " + carbonTablePath
+                    .getMetadataDirectoryPath());
+            status = false;
+          }
+        } else {
+          LOGGER.error("Not able to acquire the lock.");
+          status = false;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
+            .getMetadataDirectoryPath());
+        status = false;
+
+      } finally {
+        if (updateLockStatus) {
+          if (updateLock.unlock()) {
+            LOGGER.info("Unlock the segment update lock successfully.");
+          } else {
+            LOGGER.error("Not able to unlock the segment update lock.");
+          }
+        }
+        if (tableLockStatus) {
+          if (statusLock.unlock()) {
+            LOGGER.info("Unlock the table status lock successfully.");
+          } else {
+            LOGGER.error("Not able to unlock the table status lock.");
+          }
+        }
+      }
+    }
+    return status;
+  }
+
+  /**
+   * method to update table status in case of IUD Update Delta Compaction.
+   * @param loadsToMerge
+   * @param metaDataFilepath
+   * @param MergedLoadName
+   * @param carbonLoadModel
+   * @param compactionType
+   * @return
+   *//*
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
-      String mergeLoadStartTime, CompactionType compactionType) {
+      CompactionType compactionType) {
 
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager();
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+            + carbonLoadModel.getTableName() + " for table status updation ");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+
+        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String mergedLoadNumber = MergedLoadName.substring(
+            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+
+        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
+        for (LoadMetadataDetails loadDetail : loadDetails) {
+          // check if this segment is merged.
+          if (loadsToMerge.contains(loadDetail)) {
+            // if the compacted load is deleted after the start of the compaction process,
+            // then need to discard the compaction process and treat it as failed compaction.
+            if (loadDetail.getLoadStatus()
+                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+                  + " is deleted after the compaction is started.");
+              return tableStatusUpdationStatus;
+            }
+            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
+            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+            loadDetail.setMergedLoadName(mergedLoadNumber);
+          }
+        }
+
+        // create entry for merged one.
+        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
+        loadMetadataDetails.setLoadEndTime(loadEnddate);
+        loadMetadataDetails.setLoadName(mergedLoadNumber);
+        loadMetadataDetails
+            .setLoadStartTime(Long.parseLong(carbonLoadModel.getFactTimeStamp()));
+        loadMetadataDetails.setPartitionCount("0");
+        // if this is a major compaction then set the segment as major compaction.
+        if (compactionType == CompactionType.MAJOR_COMPACTION) {
+          loadMetadataDetails.setMajorCompacted("true");
+        }
+
+        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
+
+        // put the merged folder entry
+        updatedDetailsList.add(loadMetadataDetails);
+
+        try {
+          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+          tableStatusUpdationStatus = true;
+        } catch (IOException e) {
+          LOGGER.error("Error while writing metadata");
+        }
+      } else {
+        LOGGER.error(
+            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + "for table status updation");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + " during table status updation");
+      }
+    }
+    return tableStatusUpdationStatus;
+  }*/
+  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
+      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
+      long mergeLoadStartTime, CompactionType compactionType) {
+
+    boolean tableStatusUpdationStatus = false;
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
     try {
       if (carbonLock.lockWithRetries()) {
@@ -148,7 +549,7 @@ public final class CarbonDataMergerUtil {
             MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
                 + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
 
-        String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
+        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
         for (LoadMetadataDetails loadDetail : loadDetails) {
           // check if this segment is merged.
           if (loadsToMerge.contains(loadDetail)) {
@@ -160,7 +561,7 @@ public final class CarbonDataMergerUtil {
                   + " is deleted after the compaction is started.");
               return false;
             }
-            loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
+            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
             loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
             loadDetail.setMergedLoadName(mergedLoadNumber);
           }
@@ -170,8 +571,8 @@ public final class CarbonDataMergerUtil {
         LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
         loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-        String loadEnddate = CarbonLoaderUtil.readCurrentTime();
-        loadMetadataDetails.setTimestamp(loadEnddate);
+        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
+        loadMetadataDetails.setLoadEndTime(loadEnddate);
         loadMetadataDetails.setLoadName(mergedLoadNumber);
         loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
         loadMetadataDetails.setPartitionCount("0");
@@ -316,10 +717,10 @@ public final class CarbonDataMergerUtil {
           first = false;
           continue;
         }
-        String segmentDate = segment.getLoadStartTime();
+        long segmentDate = segment.getLoadStartTime();
         Date segDate2 = null;
         try {
-          segDate2 = sdf.parse(segmentDate);
+          segDate2 = sdf.parse(sdf.format(segmentDate));
         } catch (ParseException e) {
           LOGGER.error("Error while parsing segment start time" + e.getMessage());
         }
@@ -351,10 +752,10 @@ public final class CarbonDataMergerUtil {
    */
   private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
       LoadMetadataDetails segment, SimpleDateFormat sdf) {
-    String baselineLoadStartTime = segment.getLoadStartTime();
+    long baselineLoadStartTime = segment.getLoadStartTime();
     Date segDate1 = null;
     try {
-      segDate1 = sdf.parse(baselineLoadStartTime);
+      segDate1 = sdf.parse(sdf.format(baselineLoadStartTime));
     } catch (ParseException e) {
       LOGGER.error("Error while parsing segment start time" + e.getMessage());
     }
@@ -640,6 +1041,57 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
+   * This method returns the valid segments attached to the table Identifier.
+   *
+   * @param absoluteTableIdentifier
+   * @return
+   */
+  public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
+          throws IOException {
+
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
+    try {
+      validAndInvalidSegments =
+              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+    } catch (IOException e) {
+      LOGGER.error("Error while getting valid segment list for a table identifier");
+      throw new IOException();
+    }
+    return validAndInvalidSegments.getValidSegments();
+  }
+
+  /**
+   * Combining the list of maps to one map.
+   *
+   * @param mapsOfNodeBlockMapping
+   * @return
+   */
+  public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
+      List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
+
+    Map<String, List<TableBlockInfo>> combinedMap =
+        new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // traverse list of maps.
+    for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
+      // traverse inside each map.
+      for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) {
+
+        String node = eachEntry.getKey();
+        List<TableBlockInfo> blocks = eachEntry.getValue();
+
+        // if already that node detail exist in the combined map.
+        if (null != combinedMap.get(node)) {
+          List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
+          blocksAlreadyPresent.addAll(blocks);
+        } else { // if its not present in map then put to map.
+          combinedMap.put(node, blocks);
+        }
+      }
+    }
+    return combinedMap;
+  }
+
+  /**
    * Removing the already merged segments from list.
    */
   public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
@@ -656,4 +1108,547 @@ public final class CarbonDataMergerUtil {
 
   }
 
+  /**
+   * method to identify the segments qualified for merging in case of IUD Compaction.
+   *
+   * @param carbonLoadModel
+   * @param compactionType
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(
+          List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel,
+          CompactionType compactionType) {
+
+    List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size());
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+            carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    if (compactionType.equals(CompactionType.IUD_FACTFILE_COMPACTION)) {
+      /**
+       * Check each segments in a loop if it qualifies for IUD Compaction i.e.
+       * if the number of Delete Delta Files are than Threshold or Size of the
+       * cumulative Delete Delta is greater than threshold.
+       */
+      int numberDeleteDeltaFilesThreshold =
+              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+      for (LoadMetadataDetails seg : segments) {
+        if (isSegmentValid(seg) && checkDeleteDeltaFilesInSeg(seg.getLoadName(),
+                carbonLoadModel.getSegmentUpdateStatusManager(), numberDeleteDeltaFilesThreshold)) {
+          validSegments.add(seg);
+        }
+      }
+    } else if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+      int numberUpdateDeltaFilesThreshold =
+              CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+      for (LoadMetadataDetails seg : segments) {
+        if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
+                absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
+                numberUpdateDeltaFilesThreshold)) {
+          validSegments.add(seg);
+        }
+      }
+    }
+    return validSegments;
+  }
+
+  private static boolean isSegmentValid(LoadMetadataDetails seg) {
+    return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+            || seg.getLoadStatus()
+            .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
+            .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
+  }
+  /**
+   * method gets the segments list which get qualified for IUD compaction.
+   * @param Segments
+   * @param absoluteTableIdentifier
+   * @param compactionTypeIUD
+   * @return
+   */
+  public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
+                                                              AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                              SegmentUpdateStatusManager segmentUpdateStatusManager,
+                                                              CompactionType compactionTypeIUD) {
+
+    List<String> validSegments = new ArrayList<>();
+
+    if (compactionTypeIUD.equals(CompactionType.IUD_FACTFILE_COMPACTION)) {
+      int numberDeleteDeltaFilesThreshold =
+              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+      for (String seg : Segments) {
+        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
+                numberDeleteDeltaFilesThreshold)) {
+          validSegments.add(seg);
+        }
+      }
+    } else if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
+      int numberDeleteDeltaFilesThreshold =
+              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+      List<String> deleteSegments = new ArrayList<>();
+      for (String seg : Segments) {
+        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
+                numberDeleteDeltaFilesThreshold)) {
+          deleteSegments.add(seg);
+        }
+      }
+      if (deleteSegments.size() > 0) {
+        // This Code Block Append the Segname along with the Blocks selected for Merge instead of
+        // only taking the segment name. This will help to parallelize better for each block
+        // in case of Delete Horizontal Compaction.
+        for (String segName : deleteSegments) {
+          List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
+                  numberDeleteDeltaFilesThreshold);
+          if (tempSegments != null) {
+            for (String tempSeg : tempSegments) {
+              validSegments.add(tempSeg);
+            }
+          }
+        }
+      }
+    } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+      int numberUpdateDeltaFilesThreshold =
+              CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+      for (String seg : Segments) {
+        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier,segmentUpdateStatusManager,
+                numberUpdateDeltaFilesThreshold)) {
+          validSegments.add(seg);
+        }
+      }
+    }
+    return validSegments;
+  }
+
+  /**
+   * Check if the blockname of the segment belongs to the Valid Update Delta List or not.
+   * @param seg
+   * @param blkName
+   * @param segmentUpdateStatusManager
+   * @return
+   */
+  public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName,
+                                                   SegmentUpdateStatusManager segmentUpdateStatusManager) {
+
+    List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg);
+
+    String fullBlock = blkName;
+    String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR);
+    String blockName = FileParts[FileParts.length - 1];
+
+    for (String str : list) {
+      if (str.contains(blockName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This method traverses Update Delta Files inside the seg and return true
+   * if UpdateDelta Files are more than IUD Compaction threshold.
+   *
+   * @param seg
+   * @param absoluteTableIdentifier
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+  public static Boolean checkUpdateDeltaFilesInSeg(String seg,
+                                                   AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                   SegmentUpdateStatusManager segmentUpdateStatusManager,
+                                                   int numberDeltaFilesThreshold) {
+
+    CarbonFile[] updateDeltaFiles = null;
+    Set<String> uniqueBlocks = new HashSet<String>();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+    CarbonFile segDir =
+            FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+    CarbonFile[] allSegmentFiles = segDir.listFiles();
+
+    updateDeltaFiles = segmentUpdateStatusManager
+            .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
+                    false, allSegmentFiles);
+
+    if (updateDeltaFiles == null) {
+      return false;
+    }
+
+    // The update Delta files may have Spill over blocks. Will consider multiple spill over
+    // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which
+    // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate
+    // Spill Over Blocks will choose files with unique taskID.
+    for (CarbonFile blocks : updateDeltaFiles) {
+      // Get Task ID and the Timestamp from the Block name for e.g.
+      // part-0-3-1481084721319.carbondata => "3-1481084721319"
+      String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+      String timestamp =
+              CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+      String taskAndTimeStamp = task + "-" + timestamp;
+      uniqueBlocks.add(taskAndTimeStamp);
+    }
+    if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
+   * if the number of delete delta files present in the segment is more than
+   * numberDeltaFilesThreshold.
+   *
+   * @param seg
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+  private static boolean checkDeleteDeltaFilesInSeg(String seg,
+                                                    SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+
+    Set<String> uniqueBlocks = new HashSet<String>();
+    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+
+    for (final String blockName : blockNameList) {
+
+      CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
+              .getDeleteDeltaFilesList(seg, blockName);
+
+      // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
+      // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
+      // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
+      // Spill Over Blocks will choose files with unique taskID.
+      for (CarbonFile blocks : deleteDeltaFiles) {
+        // Get Task ID and the Timestamp from the Block name for e.g.
+        // part-0-3-1481084721319.carbondata => "3-1481084721319"
+        String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+        String timestamp =
+                CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+        String taskAndTimeStamp = task + "-" + timestamp;
+        uniqueBlocks.add(taskAndTimeStamp);
+      }
+
+      if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
+   * if the number of delete delta files present in the segment is more than
+   * numberDeltaFilesThreshold.
+   * @param seg
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+
+  private static List<String> getDeleteDeltaFilesInSeg(String seg,
+                                                       SegmentUpdateStatusManager segmentUpdateStatusManager,
+                                                       int numberDeltaFilesThreshold) {
+
+    List<String> blockLists = new ArrayList<>();
+
+    List<String> blockNameList =
+            segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+
+    for (final String blockName : blockNameList) {
+
+      CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
+              .getDeleteDeltaFilesList(seg, blockName);
+
+      if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
+        blockLists.add(seg + "/" + blockName);
+      }
+    }
+    return blockLists;
+  }
+
+  /**
+   * Returns true is horizontal compaction is enabled.
+   * @return
+   */
+  public static boolean isHorizontalCompactionEnabled() {
+    if ((CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled,
+                    CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * method to compact Delete Delta files in case of IUD Compaction.
+   *
+   * @param seg
+   * @param blockName
+   * @param absoluteTableIdentifier
+   * @param segmentUpdateDetails
+   * @param timestamp
+   * @return
+   * @throws IOException
+   */
+  public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
+                                                                              String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                                              SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
+
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+            new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
+
+    // set the update status.
+    segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
+
+    CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
+            .getDeleteDeltaFilesList(seg, blockName);
+
+    String destFileName =
+            blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
+    String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
+            + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
+
+    List<String> deleteFilePathList = new ArrayList<String>();
+    for (CarbonFile cFile : deleteDeltaFiles) {
+      deleteFilePathList.add(cFile.getCanonicalPath());
+    }
+
+    CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
+    blockDetails.setBlockName(blockName);
+    blockDetails.setSegmentName(seg);
+    blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
+    blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
+
+    try {
+      if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
+        blockDetails.setCompactionStatus(true);
+      } else {
+        blockDetails.setCompactionStatus(false);
+      }
+      resultList.add(blockDetails);
+    } catch (IOException e) {
+      LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
+              + fullBlockFilePath);
+      throw new IOException();
+    }
+    return resultList;
+  }
+
+  /**
+   * this method compact the delete delta files.
+   * @param deleteDeltaFiles
+   * @param blockName
+   * @param fullBlockFilePath
+   * @return
+   */
+  public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles,
+                                                        String blockName, String fullBlockFilePath) throws IOException {
+
+    DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
+    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
+    try {
+      deleteDeltaBlockDetails =
+              dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
+    } catch (Exception e) {
+      String blockFilePath = fullBlockFilePath
+              .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+      LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
+      throw new IOException();
+    }
+    CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
+            new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
+                    FileFactory.getFileType(fullBlockFilePath));
+    try {
+      carbonDeleteWriter.write(deleteDeltaBlockDetails);
+    } catch (IOException e) {
+      LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath);
+      throw new IOException();
+    }
+    return true;
+  }
+
+  public static Boolean updateStatusFile(
+          List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
+          String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
+
+    List<SegmentUpdateDetails> segmentUpdateDetails =
+            new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
+
+
+    // Check the list output.
+    for (CarbonDataMergerUtilResult carbonDataMergerUtilResult : updateDataMergerDetailsList) {
+      if (carbonDataMergerUtilResult.getCompactionStatus()) {
+        SegmentUpdateDetails tempSegmentUpdateDetails = new SegmentUpdateDetails();
+        tempSegmentUpdateDetails.setSegmentName(carbonDataMergerUtilResult.getSegmentName());
+        tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());
+
+        for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
+                .getUpdateStatusDetails()) {
+          if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
+                  && origDetails.getSegmentName()
+                  .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
+
+            tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
+            tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
+            break;
+          }
+        }
+
+        tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
+                carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
+        tempSegmentUpdateDetails
+                .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
+
+        segmentUpdateDetails.add(tempSegmentUpdateDetails);
+      } else return false;
+    }
+
+    CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
+
+    // Update the Table Status.
+    String metaDataFilepath = table.getMetaDataFilepath();
+    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+
+    boolean lockStatus = false;
+
+    try {
+      lockStatus = carbonLock.lockWithRetries();
+      if (lockStatus) {
+        LOGGER.info(
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                        + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+          if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+            loadMetadata.setUpdateStatusFileName(
+                    CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+          }
+        }
+        try {
+          segmentStatusManager
+                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+        } catch (IOException e) {
+          return false;
+        }
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+      }
+    } finally {
+      if (lockStatus) {
+        if (carbonLock.unlock()) {
+          LOGGER.info(
+                  "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                          + "." + table.getFactTableName());
+        } else {
+          LOGGER.error(
+                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+                          .getFactTableName() + " during table status updation");
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This will update the property of segments as major compacted.
+   * @param model
+   * @param changedSegDetails
+   */
+  public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,
+                                                            List<LoadMetadataDetails> changedSegDetails,
+                                                            List<LoadMetadataDetails> preservedSegment) throws Exception {
+
+    String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+            model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
+    List<LoadMetadataDetails> originalList = Arrays.asList(details);
+    for (LoadMetadataDetails segment : changedSegDetails) {
+      if(preservedSegment.contains(segment)) {
+        continue;
+      }
+      originalList.get(originalList.indexOf(segment)).setMajorCompacted("true");
+
+    }
+
+
+    ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
+            model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
+            LockUsage.TABLE_STATUS_LOCK);
+
+    try {
+      if (carbonTableStatusLock.lockWithRetries()) {
+        LOGGER.info(
+                "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
+                        + " for table status updation ");
+        CarbonTablePath carbonTablePath = CarbonStorePath
+                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                        absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
+                originalList.toArray(new LoadMetadataDetails[originalList.size()]));
+      } else {
+        LOGGER.error(
+                "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
+                        .getTableName() + "for table status updation");
+        throw new Exception("Failed to update the MajorCompactionStatus.");
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while writing metadata");
+      throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage());
+    } finally {
+      if (carbonTableStatusLock.unlock()) {
+        LOGGER.info(
+                "Table unlocked successfully after table status updation" + model.getDatabaseName()
+                        + "." + model.getTableName());
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
+                .getTableName() + " during table status updation");
+      }
+    }
+
+  }
+
+  /**
+   *
+   * @param blkName
+   * @param invalidBlocks
+   * @return
+   */
+  public static Boolean checkUpdateDeltaAndFactMatchBlock(String blkName,
+                                                          List<String> invalidBlocks) {
+
+    String fullBlock = blkName;
+    String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR);
+    String blockName = FileParts[FileParts.length - 1];
+
+    if (!invalidBlocks.contains(blockName)) {
+      return true;
+    }
+
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
new file mode 100644
index 0000000..4511f90
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
@@ -0,0 +1,12 @@
+package org.apache.carbondata.spark.merger;
+
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+
+public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails {
+  private boolean compactionStatus;
+
+  public boolean getCompactionStatus() {return compactionStatus;}
+
+  public void setCompactionStatus(Boolean status) {compactionStatus = status;}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
index d735b44..c44b94f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
@@ -20,9 +20,15 @@ package org.apache.carbondata.spark.merger;
 
 /**
  * This enum is used to define the types of Compaction.
- * We have 2 types. one is Minor and another is Major
+ * We have 3 types. one is Minor another is Major and
+ * finally a compaction done after UPDATE-DELETE operation
+ * called IUD compaction.
  */
 public enum CompactionType {
     MINOR_COMPACTION,
-    MAJOR_COMPACTION
+    MAJOR_COMPACTION,
+    IUD_FACTFILE_COMPACTION,
+    IUD_UPDDEL_DELTA_COMPACTION,
+    IUD_DELETE_DELTA_COMPACTION,
+    NONE
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
index 5c534fa..324abbf 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -18,16 +18,6 @@
  */
 package org.apache.carbondata.spark.merger;
 
-import java.io.File;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
@@ -41,6 +31,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -55,6 +46,9 @@ import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE
 import org.apache.carbondata.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
 
+import java.io.File;
+import java.util.*;
+
 /**
  * This is the Merger class responsible for the merging of the segments.
  */
@@ -80,7 +74,9 @@ public class RowResultMerger {
 
   public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
       String tableName, SegmentProperties segProp, String tempStoreLocation,
-      CarbonLoadModel loadModel, int[] colCardinality) {
+      CarbonLoadModel loadModel, int[] colCardinality, CompactionType compactionType) {
+
+    CarbonDataFileAttributes carbonDataFileAttributes;
 
     this.rawResultIteratorList = iteratorList;
     // create the List of RawResultIterator.
@@ -106,9 +102,23 @@ public class RowResultMerger {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         getCarbonFactDataHandlerModel(loadModel);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
-            loadModel.getFactTimeStamp());
+
+    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+      int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+          CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
+              carbonTable.getCarbonTableIdentifier()));
+
+      // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
+      // be written in same segment. So the TaskNo should be incremented by 1 from max val.
+      int index = taskNo + 1;
+      carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
+    }
+    else {
+      carbonDataFileAttributes =
+          new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+              loadModel.getFactTimeStamp());
+    }
+
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
     if (segProp.getNumberOfNoDictionaryDimension() > 0
         || segProp.getComplexDimensions().size() > 0) {
@@ -130,10 +140,9 @@ public class RowResultMerger {
   public boolean mergerSlice() {
     boolean mergeStatus = false;
     int index = 0;
+    boolean isDataPresent = false;
     try {
 
-      dataHandler.initialise();
-
       // add all iterators to the queue
       for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
         this.recordHolderHeap.add(leaftTupleIterator);
@@ -145,7 +154,12 @@ public class RowResultMerger {
         iterator = this.recordHolderHeap.poll();
         Object[] convertedRow = iterator.next();
         if(null == convertedRow){
-          throw new SliceMergerException("Unable to generate mdkey during compaction.");
+          index--;
+          continue;
+        }
+        if (!isDataPresent) {
+          dataHandler.initialise();
+          isDataPresent = true;
         }
         // get the mdkey
         addRow(convertedRow);
@@ -164,7 +178,12 @@ public class RowResultMerger {
       while (true) {
         Object[] convertedRow = iterator.next();
         if(null == convertedRow){
-          throw new SliceMergerException("Unable to generate mdkey during compaction.");
+          break;
+        }
+        // do it only once
+        if (!isDataPresent) {
+          dataHandler.initialise();
+          isDataPresent = true;
         }
         addRow(convertedRow);
         // check if leaf contains no record
@@ -172,14 +191,19 @@ public class RowResultMerger {
           break;
         }
       }
-      this.dataHandler.finish();
+      if(isDataPresent)
+      {
+        this.dataHandler.finish();
+      }
       mergeStatus = true;
     } catch (Exception e) {
       LOGGER.error("Exception in compaction merger " + e.getMessage());
       mergeStatus = false;
     } finally {
       try {
-        this.dataHandler.closeHandler();
+        if(isDataPresent) {
+          this.dataHandler.closeHandler();
+        }
       } catch (CarbonDataWriterException e) {
         LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
         mergeStatus = false;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 01edf4a..2ff426b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -162,7 +162,8 @@ class CarbonMergerRDD[K, V](
             segmentProperties,
             tempStoreLoc,
             carbonLoadModel,
-            carbonMergerMapping.maxSegmentColCardinality
+            carbonMergerMapping.maxSegmentColCardinality,
+            carbonMergerMapping.campactionType
           )
         mergeStatus = merger.mergerSlice()
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index fe805fe..6eb11c7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -17,12 +17,14 @@
 
 package org.apache.carbondata.spark.rdd
 
+import org.apache.carbondata.core.update.CarbonUpdateUtil
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.MergeResultImpl
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
@@ -53,7 +55,7 @@ object Compactor {
     val factTableName = carbonLoadModel.getTableName
     val validSegments: Array[String] = CarbonDataMergerUtil
       .getValidSegments(loadsToMerge).split(',')
-    val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
+    val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime();
     val carbonMergerMapping = CarbonMergerMapping(storeLocation,
       storePath,
       carbonTable.getMetaDataFilepath,
@@ -64,6 +66,7 @@ object Compactor {
       factTableName,
       validSegments,
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+      compactionType,
       maxSegmentColCardinality = null,
       maxSegmentColumnSchemaList = null
     )

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index c2f06a4..8eca81a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -20,19 +20,27 @@ package org.apache.carbondata.spark.rdd
 import java.util
 import java.util.concurrent._
 
+import org.apache.carbondata.core.update.CarbonUpdateUtil
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.locks.{LockUsage, CarbonLockUtil, CarbonLockFactory}
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
@@ -44,6 +52,105 @@ object DataManagementFunc {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
+  def deleteLoadByDate(
+      sqlContext: SQLContext,
+      schema: CarbonDataLoadSchema,
+      databaseName: String,
+      tableName: String,
+      storePath: String,
+      dateField: String,
+      dateFieldActualName: String,
+      dateValue: String) {
+
+    val sc = sqlContext
+    // Delete the records based on data
+    val table = CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)
+    val loadMetadataDetailsArray =
+      SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
+    val resultMap = new CarbonDeleteLoadByDateRDD(
+      sc.sparkContext,
+      new DeletedLoadResultImpl(),
+      databaseName,
+      table.getDatabaseName,
+      dateField,
+      dateFieldActualName,
+      dateValue,
+      table.getFactTableName,
+      tableName,
+      storePath,
+      loadMetadataDetailsArray).collect.groupBy(_._1)
+
+    var updatedLoadMetadataDetailsList = new ListBuffer[LoadMetadataDetails]()
+    if (resultMap.nonEmpty) {
+      if (resultMap.size == 1) {
+        if (resultMap.contains("")) {
+          LOGGER.error("Delete by Date request is failed")
+          sys.error("Delete by Date request is failed, potential causes " +
+              "Empty store or Invalid column type, For more details please refer logs.")
+        }
+      }
+      val updatedloadMetadataDetails = loadMetadataDetailsArray.map { elem => {
+        var statusList = resultMap.get(elem.getLoadName)
+        // check for the merged load folder.
+        if (statusList.isEmpty && null != elem.getMergedLoadName) {
+          statusList = resultMap.get(elem.getMergedLoadName)
+        }
+
+        if (statusList.isDefined) {
+          elem.setModificationOrdeletionTimesStamp(elem.getTimeStamp(CarbonLoaderUtil
+            .readCurrentTime()))
+          // if atleast on CarbonCommonConstants.MARKED_FOR_UPDATE status exist,
+          // use MARKED_FOR_UPDATE
+          if (statusList.get
+              .forall(status => status._2 == CarbonCommonConstants.MARKED_FOR_DELETE)) {
+            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+          } else {
+            elem.setLoadStatus(CarbonCommonConstants.MARKED_FOR_UPDATE)
+            updatedLoadMetadataDetailsList += elem
+          }
+          elem
+        } else {
+          elem
+        }
+      }
+
+      }
+
+      // Save the load metadata
+      val carbonLock = CarbonLockFactory
+          .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.METADATA_LOCK
+          )
+      try {
+        if (carbonLock.lockWithRetries()) {
+          LOGGER.info("Successfully got the table metadata file lock")
+          if (updatedLoadMetadataDetailsList.nonEmpty) {
+            // TODO: Load Aggregate tables after retention.
+          }
+
+          // write
+          CarbonLoaderUtil.writeLoadMetadata(
+            schema,
+            databaseName,
+            table.getDatabaseName,
+            updatedloadMetadataDetails.asJava
+          )
+        }
+      } finally {
+        if (carbonLock.unlock()) {
+          LOGGER.info("unlock the table metadata file successfully")
+        } else {
+          LOGGER.error("Unable to unlock the metadata lock")
+        }
+      }
+    } else {
+      LOGGER.error("Delete by Date request is failed")
+      LOGGER.audit(s"The delete load by date is failed for $databaseName.$tableName")
+      sys.error("Delete by Date request is failed, potential causes " +
+          "Empty store or Invalid column type, For more details please refer logs.")
+    }
+  }
+
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
       storePath: String,
       compactionModel: CompactionModel,
@@ -168,7 +275,7 @@ object DataManagementFunc {
     newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     newCarbonLoadModel.setStorePath(table.getStorePath)
     CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
-    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+    val loadStartTime = CarbonUpdateUtil.readCurrentTime();
     newCarbonLoadModel.setFactTimeStamp(loadStartTime)
   }
 
@@ -260,6 +367,7 @@ object DataManagementFunc {
         LOGGER.audit(errorMsg)
         LOGGER.error(errorMsg)
         throw new Exception(errorMsg + " Please try after some time.")
+
       }
     } finally {
       CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f9fb1b91/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8504fc8..af69652 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -20,6 +20,10 @@ package org.apache.carbondata.spark.rdd
 import java.util
 import java.util.concurrent._
 
+import org.apache.carbondata.core.update.CarbonUpdateUtil
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory, ICarbonLock}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
@@ -45,8 +49,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
@@ -90,7 +92,7 @@ object CarbonDataRDDFactory {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
     }
     // reading the start time of data load.
-    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+    val loadStartTime = CarbonUpdateUtil.readCurrentTime();
     carbonLoadModel.setFactTimeStamp(loadStartTime)
 
     val isCompactionTriggerByDDl = true
@@ -472,7 +474,7 @@ object CarbonDataRDDFactory {
       }
 
       // reading the start time of data load.
-      val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime();
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       val tableCreationTime = CarbonEnv.get.carbonMetastore
           .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)


Mime
View raw message