carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [19/22] incubator-carbondata git commit: IUD horizontal compaction of update and delete delta files support
Date Fri, 06 Jan 2017 13:57:19 GMT
IUD horizontal compaction of update and delete delta files support


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/cdae9ed4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/cdae9ed4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/cdae9ed4

Branch: refs/heads/master
Commit: cdae9ed4d1ac37422d3e1d1d7257b753a29afbc2
Parents: d0b4a98
Author: sounakr <sounakr@gmail.com>
Authored: Mon Jan 2 12:36:11 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../scan/executor/infos/BlockExecutionInfo.java |  14 +
 .../scan/result/AbstractScannedResult.java      |  19 +
 .../scan/scanner/AbstractBlockletScanner.java   |   9 +
 .../carbondata/spark/load/CarbonLoaderUtil.java |  31 ++
 .../spark/merger/CarbonDataMergerUtil.java      | 366 +++----------------
 .../carbondata/spark/merger/CompactionType.java |   1 -
 .../org/apache/carbondata/spark/KeyVal.scala    |   6 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          | 132 +++++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 130 +++++--
 .../apache/carbondata/spark/rdd/Compactor.scala |  47 ++-
 .../spark/rdd/DataManagementFunc.scala          |  32 +-
 .../execution/command/carbonTableSchema.scala   |   4 +-
 .../sql/execution/command/IUDCommands.scala     |  60 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  30 +-
 14 files changed, 473 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
index 2d31448..524ed76 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.scan.executor.infos;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
@@ -200,6 +201,19 @@ public class BlockExecutionInfo {
   private boolean vectorBatchCollector;
 
   /**
+   * absolute table identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
index bb2d132..8f22f89 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
@@ -25,6 +25,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.carbondata.common.iudprocessor.cache.BlockletLevelDeleteDeltaDataCache;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
@@ -111,6 +112,8 @@ public abstract class AbstractScannedResult {
    */
   private int[] complexParentBlockIndexes;
 
+  protected BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache;
+
   public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
     this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
     this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes();
@@ -544,4 +547,20 @@ public abstract class AbstractScannedResult {
    * @return measure value
    */
   public abstract BigDecimal getBigDecimalMeasureValue(int ordinal);
+
+  /**
+   *
+   * @return BlockletLevelDeleteDeltaDataCache.
+   */
+  public BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache() {
+    return blockletDeleteDeltaCache;
+  }
+
+  /**
+   * @param blockletDeleteDeltaCache
+   */
+  public void setBlockletDeleteDeltaCache(
+      BlockletLevelDeleteDeltaDataCache blockletDeleteDeltaCache) {
+    this.blockletDeleteDeltaCache = blockletDeleteDeltaCache;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
index 325fac8..f97641e 100644
--- a/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.scan.scanner;
 
 import java.io.IOException;
 
+import org.apache.carbondata.common.iudprocessor.iuddata.BlockletDeleteDeltaCacheLoader;
+import org.apache.carbondata.common.iudprocessor.iuddata.DeleteDeltaCacheLoaderIntf;
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants;
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsModel;
@@ -81,5 +83,12 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
     scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
             .getMeasureChunks(blocksChunkHolder.getFileReader(),
                 blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+    // loading delete data cache in blockexecutioninfo instance
+    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
+        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
+            blocksChunkHolder.getDataBlock(), blockExecutionInfo.getAbsoluteTableIdentifier());
+    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
+    scannedResult
+        .setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index b2951b4..5f0e954 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -51,6 +51,7 @@ import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -63,7 +64,9 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.update.UpdateVO;
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.fileoperations.AtomicFileOperations;
@@ -937,6 +940,34 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * @param tableInfo
+   * @param invalidBlockVOForSegmentId
+   */
+  public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+      UpdateVO invalidBlockVOForSegmentId,
+      SegmentUpdateStatusManager updateStatusMngr) {
+
+    if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
+            CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+                    .getCarbonDataExtension())) {
+      return true;
+    }
+
+    UpdateVO updatedVODetails = invalidBlockVOForSegmentId;
+    if (null != updatedVODetails) {
+      Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
+              .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
+                      tableInfo.getFilePath().lastIndexOf('.')));
+      if ((blockTimeStamp > updatedVODetails.getFactTimestamp() && (
+              updatedVODetails.getUpdateDeltaStartTimestamp() != null
+                      && blockTimeStamp < updatedVODetails.getUpdateDeltaStartTimestamp()))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * return the Array of available local-dirs
    *
    * @param conf

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 1392bc9..3e070a8 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -158,118 +158,6 @@ public final class CarbonDataMergerUtil {
     }
 
   }
-  
-  
-  /**
-   * Update the Table Status file After IUD update delta compaction.
-   * @param loadsToMerge
-   * @param mergeList
-   * @param metaDataFilepath
-   * @param MergedLoadName
-   * @param carbonLoadModel
-   * @return
-   */
-
-  public static boolean updateLoadMetadataIUDFactFileCompaction(
-      List<LoadMetadataDetails> loadsToMerge, List<String> mergeList, String metaDataFilepath,
-      String MergedLoadName, CarbonLoadModel carbonLoadModel) {
-
-    boolean tableStatusUpdationStatus = false;
-
-    Map<String, String> mergeMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    if (mergeList == null) return false;
-
-    for (String mergeElem : mergeList) {
-      String[] strArray = mergeElem.split(",");
-      String orgSegmentname = strArray[0];
-      String mergedSegmentname = strArray[1];
-      mergeMap.put(orgSegmentname, mergedSegmentname);
-    }
-
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
-            + carbonLoadModel.getTableName() + " for table status updation ");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
-
-        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        String mergedLoadNumber = MergedLoadName.substring(
-            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
-        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
-        for (LoadMetadataDetails loadDetail : loadDetails) {
-          // check if this segment is merged.
-          if (loadsToMerge.contains(loadDetail)) {
-            // if the compacted load is deleted after the start of the compaction process,
-            // then need to discard the compaction process and treat it as failed compaction.
-            if (loadDetail.getLoadStatus()
-                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
-                  + " is deleted after the compaction is started.");
-              return tableStatusUpdationStatus;
-            }
-            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
-            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
-            loadDetail.setMergedLoadName(mergeMap.get(loadDetail.getLoadName()));
-          }
-        }
-        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
-
-        // create entry for merged one.
-        for (String mergedList : mergeList) {
-          LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-          loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-          long loadEnddate = CarbonUpdateUtil.readCurrentTime();
-          loadMetadataDetails.setLoadEndTime(loadEnddate);
-          loadMetadataDetails.setLoadName(mergeMap.get(mergedList.split(",")[0]));
-          loadMetadataDetails
-              .setLoadStartTime(Long.parseLong(carbonLoadModel.getFactTimeStamp()));
-          loadMetadataDetails.setPartitionCount("0");
-          updatedDetailsList.add(loadMetadataDetails);
-        }
-
-        // put the merged folder entry
-        try {
-          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
-              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
-          tableStatusUpdationStatus = true;
-        } catch (IOException e) {
-          LOGGER.error("Error while writing metadata");
-        }
-      } else {
-        LOGGER.error(
-            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + "for table status updation");
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
-            .getDatabaseName() + "." + carbonLoadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + " during table status updation");
-      }
-    }
-    return tableStatusUpdationStatus;
-
-  }
 
   /**
    * Update Both Segment Update Status and Table Status for the case of IUD Delete
@@ -429,98 +317,7 @@ public final class CarbonDataMergerUtil {
    * @param carbonLoadModel
    * @param compactionType
    * @return
-   *//*
-  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
-      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
-      CompactionType compactionType) {
-
-    boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager();
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
-
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
-            + carbonLoadModel.getTableName() + " for table status updation ");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
-
-        LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        String mergedLoadNumber = MergedLoadName.substring(
-            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
-        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
-        for (LoadMetadataDetails loadDetail : loadDetails) {
-          // check if this segment is merged.
-          if (loadsToMerge.contains(loadDetail)) {
-            // if the compacted load is deleted after the start of the compaction process,
-            // then need to discard the compaction process and treat it as failed compaction.
-            if (loadDetail.getLoadStatus()
-                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
-                  + " is deleted after the compaction is started.");
-              return tableStatusUpdationStatus;
-            }
-            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
-            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
-            loadDetail.setMergedLoadName(mergedLoadNumber);
-          }
-        }
-
-        // create entry for merged one.
-        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
-        loadMetadataDetails.setLoadEndTime(loadEnddate);
-        loadMetadataDetails.setLoadName(mergedLoadNumber);
-        loadMetadataDetails
-            .setLoadStartTime(Long.parseLong(carbonLoadModel.getFactTimeStamp()));
-        loadMetadataDetails.setPartitionCount("0");
-        // if this is a major compaction then set the segment as major compaction.
-        if (compactionType == CompactionType.MAJOR_COMPACTION) {
-          loadMetadataDetails.setMajorCompacted("true");
-        }
-
-        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
-
-        // put the merged folder entry
-        updatedDetailsList.add(loadMetadataDetails);
-
-        try {
-          segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
-              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
-          tableStatusUpdationStatus = true;
-        } catch (IOException e) {
-          LOGGER.error("Error while writing metadata");
-        }
-      } else {
-        LOGGER.error(
-            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + "for table status updation");
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
-            .getDatabaseName() + "." + carbonLoadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + " during table status updation");
-      }
-    }
-    return tableStatusUpdationStatus;
-  }*/
+   */
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
       long mergeLoadStartTime, CompactionType compactionType) {
@@ -627,6 +424,15 @@ public final class CarbonDataMergerUtil {
 
     sortSegments(sortedSegments);
 
+    // Check for segments which are qualified for IUD compaction.
+    if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+
+      List<LoadMetadataDetails> listOfSegmentsToBeMerged =
+          identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
+
+      return listOfSegmentsToBeMerged;
+    }
+
     // check preserve property and preserve the configured number of latest loads.
 
     List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
@@ -978,10 +784,7 @@ public final class CarbonDataMergerUtil {
     List<LoadMetadataDetails> validList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (LoadMetadataDetails segment : loadMetadataDetails) {
-      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-          || segment.getLoadStatus()
-          .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || segment
-          .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE)) {
+      if (isSegmentValid(segment)) {
         validList.add(segment);
       }
     }
@@ -1116,37 +919,20 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(
-          List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel,
-          CompactionType compactionType) {
+      List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) {
 
     List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size());
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-            carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    if (compactionType.equals(CompactionType.IUD_FACTFILE_COMPACTION)) {
-      /**
-       * Check each segments in a loop if it qualifies for IUD Compaction i.e.
-       * if the number of Delete Delta Files are than Threshold or Size of the
-       * cumulative Delete Delta is greater than threshold.
-       */
-      int numberDeleteDeltaFilesThreshold =
-              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      for (LoadMetadataDetails seg : segments) {
-        if (isSegmentValid(seg) && checkDeleteDeltaFilesInSeg(seg.getLoadName(),
-                carbonLoadModel.getSegmentUpdateStatusManager(), numberDeleteDeltaFilesThreshold)) {
-          validSegments.add(seg);
-        }
-      }
-    } else if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
-      int numberUpdateDeltaFilesThreshold =
-              CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-      for (LoadMetadataDetails seg : segments) {
-        if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
-                absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
-                numberUpdateDeltaFilesThreshold)) {
-          validSegments.add(seg);
-        }
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    int numberUpdateDeltaFilesThreshold =
+        CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+    for (LoadMetadataDetails seg : segments) {
+      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
+          absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
+          numberUpdateDeltaFilesThreshold)) {
+        validSegments.add(seg);
       }
     }
     return validSegments;
@@ -1158,6 +944,7 @@ public final class CarbonDataMergerUtil {
             .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
             .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
   }
+
   /**
    * method gets the segments list which get qualified for IUD compaction.
    * @param Segments
@@ -1166,28 +953,18 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
-                                                              AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                              SegmentUpdateStatusManager segmentUpdateStatusManager,
-                                                              CompactionType compactionTypeIUD) {
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
 
     List<String> validSegments = new ArrayList<>();
 
-    if (compactionTypeIUD.equals(CompactionType.IUD_FACTFILE_COMPACTION)) {
-      int numberDeleteDeltaFilesThreshold =
-              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      for (String seg : Segments) {
-        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
-                numberDeleteDeltaFilesThreshold)) {
-          validSegments.add(seg);
-        }
-      }
-    } else if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
+    if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
       int numberDeleteDeltaFilesThreshold =
-              CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+          CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
       List<String> deleteSegments = new ArrayList<>();
       for (String seg : Segments) {
         if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
-                numberDeleteDeltaFilesThreshold)) {
+            numberDeleteDeltaFilesThreshold)) {
           deleteSegments.add(seg);
         }
       }
@@ -1197,7 +974,7 @@ public final class CarbonDataMergerUtil {
         // in case of Delete Horizontal Compaction.
         for (String segName : deleteSegments) {
           List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
-                  numberDeleteDeltaFilesThreshold);
+              numberDeleteDeltaFilesThreshold);
           if (tempSegments != null) {
             for (String tempSeg : tempSegments) {
               validSegments.add(tempSeg);
@@ -1207,10 +984,10 @@ public final class CarbonDataMergerUtil {
       }
     } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
       int numberUpdateDeltaFilesThreshold =
-              CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+          CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
       for (String seg : Segments) {
-        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier,segmentUpdateStatusManager,
-                numberUpdateDeltaFilesThreshold)) {
+        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
+            numberUpdateDeltaFilesThreshold)) {
           validSegments.add(seg);
         }
       }
@@ -1226,7 +1003,7 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName,
-                                                   SegmentUpdateStatusManager segmentUpdateStatusManager) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager) {
 
     List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg);
 
@@ -1253,25 +1030,24 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static Boolean checkUpdateDeltaFilesInSeg(String seg,
-                                                   AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                   SegmentUpdateStatusManager segmentUpdateStatusManager,
-                                                   int numberDeltaFilesThreshold) {
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
 
     String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
     CarbonFile segDir =
-            FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
 
     updateDeltaFiles = segmentUpdateStatusManager
-            .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
-                    false, allSegmentFiles);
+        .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
+            false, allSegmentFiles);
 
     if (updateDeltaFiles == null) {
       return false;
@@ -1286,7 +1062,7 @@ public final class CarbonDataMergerUtil {
       // part-0-3-1481084721319.carbondata => "3-1481084721319"
       String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
       String timestamp =
-              CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+          CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
       String taskAndTimeStamp = task + "-" + timestamp;
       uniqueBlocks.add(taskAndTimeStamp);
     }
@@ -1308,15 +1084,15 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   private static boolean checkDeleteDeltaFilesInSeg(String seg,
-                                                    SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     Set<String> uniqueBlocks = new HashSet<String>();
     List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
 
     for (final String blockName : blockNameList) {
 
-      CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
-              .getDeleteDeltaFilesList(seg, blockName);
+      CarbonFile[] deleteDeltaFiles =
+          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
 
       // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
       // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
@@ -1327,7 +1103,7 @@ public final class CarbonDataMergerUtil {
         // part-0-3-1481084721319.carbondata => "3-1481084721319"
         String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
         String timestamp =
-                CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
         String taskAndTimeStamp = task + "-" + timestamp;
         uniqueBlocks.add(taskAndTimeStamp);
       }
@@ -1350,18 +1126,15 @@ public final class CarbonDataMergerUtil {
    */
 
   private static List<String> getDeleteDeltaFilesInSeg(String seg,
-                                                       SegmentUpdateStatusManager segmentUpdateStatusManager,
-                                                       int numberDeltaFilesThreshold) {
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     List<String> blockLists = new ArrayList<>();
-
-    List<String> blockNameList =
-            segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
 
     for (final String blockName : blockNameList) {
 
-      CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
-              .getDeleteDeltaFilesList(seg, blockName);
+      CarbonFile[] deleteDeltaFiles =
+          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
 
       if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
         blockLists.add(seg + "/" + blockName);
@@ -1376,8 +1149,8 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean isHorizontalCompactionEnabled() {
     if ((CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled,
-                    CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) {
+        .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled,
+            CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) {
       return true;
     } else {
       return false;
@@ -1396,24 +1169,24 @@ public final class CarbonDataMergerUtil {
    * @throws IOException
    */
   public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
-                                                                              String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
-                                                                              SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
+      String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-            new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
 
     List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
 
     // set the update status.
     segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
 
-    CarbonFile[] deleteDeltaFiles = segmentUpdateStatusManager
-            .getDeleteDeltaFilesList(seg, blockName);
+    CarbonFile[] deleteDeltaFiles =
+        segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
 
     String destFileName =
-            blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
+        blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
     String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
-            + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
+        + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
 
     List<String> deleteFilePathList = new ArrayList<String>();
     for (CarbonFile cFile : deleteDeltaFiles) {
@@ -1435,7 +1208,7 @@ public final class CarbonDataMergerUtil {
       resultList.add(blockDetails);
     } catch (IOException e) {
       LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
-              + fullBlockFilePath);
+          + fullBlockFilePath);
       throw new IOException();
     }
     return resultList;
@@ -1449,7 +1222,7 @@ public final class CarbonDataMergerUtil {
    * @return
    */
   public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles,
-                                                        String blockName, String fullBlockFilePath) throws IOException {
+      String blockName, String fullBlockFilePath) throws IOException {
 
     DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
     CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
@@ -1630,25 +1403,4 @@ public final class CarbonDataMergerUtil {
     }
 
   }
-
-  /**
-   *
-   * @param blkName
-   * @param invalidBlocks
-   * @return
-   */
-  public static Boolean checkUpdateDeltaAndFactMatchBlock(String blkName,
-                                                          List<String> invalidBlocks) {
-
-    String fullBlock = blkName;
-    String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR);
-    String blockName = FileParts[FileParts.length - 1];
-
-    if (!invalidBlocks.contains(blockName)) {
-      return true;
-    }
-
-    return false;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
index c44b94f..ea5fbcf 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
@@ -27,7 +27,6 @@ package org.apache.carbondata.spark.merger;
 public enum CompactionType {
     MINOR_COMPACTION,
     MAJOR_COMPACTION,
-    IUD_FACTFILE_COMPACTION,
     IUD_UPDDEL_DELTA_COMPACTION,
     IUD_DELETE_DELTA_COMPACTION,
     NONE

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 8247296..7825f5e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -95,12 +95,12 @@ class PartitionResultImpl extends PartitionResult[Int, Boolean] {
 }
 
 trait MergeResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
+  def getKey(key: String, value: Boolean): (K, V)
 
 }
 
-class MergeResultImpl extends MergeResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+class MergeResultImpl extends MergeResult[String, Boolean] {
+  override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
 }
 
 trait DeletedLoadResult[K, V] extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
new file mode 100644
index 0000000..841189b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.List
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.sql.execution.command.CarbonMergerMapping
+
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.hadoop.{CarbonMultiBlockSplit, CarbonInputFormat, CarbonInputSplit}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.MergeResult
+import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+
+/**
+ * IUD carbon merger RDD
+ * */
+class CarbonIUDMergerRDD[K, V](
+    sc: SparkContext,
+    result: MergeResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    carbonMergerMapping: CarbonMergerMapping,
+    confExecutorsTemp: String)
+  extends CarbonMergerRDD[K, V](sc,
+    result,
+    carbonLoadModel,
+    carbonMergerMapping,
+    confExecutorsTemp) {
+
+  override def getPartitions: Array[Partition] = {
+    val startTime = System.currentTimeMillis()
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
+      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    )
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    var defaultParallelism = sparkContext.defaultParallelism
+    val result = new util.ArrayList[Partition](defaultParallelism)
+
+    // mapping of the node and block list.
+    var nodeMapping: util.Map[String, util.List[Distributable]] = new
+        util.HashMap[String, util.List[Distributable]]
+
+    var noOfBlocks = 0
+
+    val taskInfoList = new util.ArrayList[Distributable]
+
+    var blocksOfLastSegment: List[TableBlockInfo] = null
+
+    CarbonInputFormat.setSegmentsToAccess(
+      job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
+
+    // get splits
+    val splits = format.getSplits(job)
+    val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+    // group blocks by segment.
+    val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
+
+    var i = 0
+
+    // No need to get a new SegmentUpdateStatus Manager as the Object is passed
+    // in CarbonLoadModel.
+    // val manager = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+    val updateStatusManager = carbonLoadModel.getSegmentUpdateStatusManager
+
+
+    // make one spark partition for one segment
+    val resultSplits = splitsGroupedMySegment.map(entry => {
+      val (segName, splits) = (entry._1, entry._2)
+      val invalidBlocks = updateStatusManager.getInvalidBlockList(segName)
+      val validSplits = splits.filter( inputSplit =>
+        CarbonDataMergerUtil
+          .checkUpdateDeltaMatchBlock(segName, inputSplit.getBlockPath, updateStatusManager)
+      )
+
+      if (!validSplits.isEmpty) {
+        val locations = validSplits(0).getLocations
+        new CarbonSparkPartition(id, i,
+          new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations(0)))
+      }
+      else {
+        null
+      }
+    }
+    ).filter( _ != null)
+
+    // max segment cardinality is calculated in executor for each segment
+    carbonMergerMapping.maxSegmentColCardinality = null
+    carbonMergerMapping.maxSegmentColumnSchemaList = null
+
+    // Log the distribution
+    val noOfTasks = resultSplits.size
+    logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
+            + s"parallelism: $defaultParallelism , no.of.nodes: unknown, no.of.tasks: $noOfTasks"
+    )
+    logInfo("Time taken to identify Blocks to scan : " + (System
+                                                            .currentTimeMillis() - startTime)
+    )
+    resultSplits.foreach { partition =>
+      val cp = partition.asInstanceOf[CarbonSparkPartition]
+      logInfo(s"Node : " + cp.multiBlockSplit.getLocations.toSeq.mkString(",")
+              + ", No.Of Blocks : " + cp.multiBlockSplit.getLength
+      )
+    }
+    resultSplits.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2ff426b..d426e6a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -21,6 +21,9 @@ import java.io.IOException
 import java.util
 import java.util.{Collections, List}
 
+import org.apache.carbondata.core.update.UpdateVO
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
@@ -35,8 +38,9 @@ import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TableTaskInfo, TaskBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
@@ -46,7 +50,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType, RowResultMerger}
 import org.apache.carbondata.spark.splits.TableSplit
 
 class CarbonMergerRDD[K, V](
@@ -61,7 +65,8 @@ class CarbonMergerRDD[K, V](
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
 
   var storeLocation: String = null
-  val storePath = carbonMergerMapping.storePath
+  var mergeResult: String = null
+  val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
   val metadataFilePath = carbonMergerMapping.metadataFilePath
   val mergedLoadName = carbonMergerMapping.mergedLoadName
   val databaseName = carbonMergerMapping.databaseName
@@ -104,25 +109,48 @@ class CarbonMergerRDD[K, V](
       try {
         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
-        // get destination segment properties as sent from driver which is of last segment.
-
-        val segmentProperties = new SegmentProperties(
-          carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
-          carbonMergerMapping.maxSegmentColCardinality)
-
         // sorting the table block info List.
         val splitList = carbonSparkPartition.split.value.getAllSplits
         val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
 
         Collections.sort(tableBlockInfoList)
 
+        // During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
+        // one segment, so max cardinality will be calculated from first block of segment
+        if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+          var dataFileFooter: DataFileFooter = null
+          try {
+            // As the tableBlockInfoList is sorted take the ColCardinality from the last
+            // Block of the sorted list as it will have the last updated cardinality.
+            // Blocks are sorted by order of updation using TableBlockInfo.compare method so
+            // the last block after the sort will be the latest one.
+            dataFileFooter = CarbonUtil
+              .readMetadatFile(tableBlockInfoList.get(tableBlockInfoList.size() - 1))
+          } catch {
+            case e: CarbonUtilException =>
+              logError("Exception in preparing the data file footer for compaction " + e.getMessage)
+              throw e
+          }
+          // target load name will be same as source load name in case of update data compaction
+          carbonMergerMapping.mergedLoadName = tableBlockInfoList.get(0).getSegmentId
+          carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
+            .getColumnCardinality
+          carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
+            .toList
+        }
+
+        // get destination segment properties as sent from driver which is of last segment.
+        val segmentProperties = new SegmentProperties(
+          carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
+          carbonMergerMapping.maxSegmentColCardinality)
+
         val segmentMapping: java.util.Map[String, TaskBlockInfo] =
           CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
 
         val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
           CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
 
-        carbonLoadModel.setStorePath(storePath)
+        carbonLoadModel.setStorePath(hdfsStoreLocation)
 
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
           carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, dataFileMetadataSegMapping)
@@ -140,10 +168,16 @@ class CarbonMergerRDD[K, V](
               sys.error("Exception occurred in query execution.Please check logs.")
             }
         }
-        mergeNumber = mergedLoadName
-          .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-                     CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
-          )
+
+        if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+          mergeNumber = tableBlockInfoList.get(0).getSegmentId
+        }
+        else {
+          mergeNumber = mergedLoadName
+            .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+                       CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+            )
+        }
 
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
           factTableName,
@@ -167,6 +201,8 @@ class CarbonMergerRDD[K, V](
           )
         mergeStatus = merger.mergerSlice()
 
+        mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
+
       } catch {
         case e: Exception =>
           LOGGER.error(e)
@@ -190,19 +226,13 @@ class CarbonMergerRDD[K, V](
       var finished = false
 
       override def hasNext: Boolean = {
-        if (!finished) {
-          finished = true
-          finished
-        } else {
-          !finished
-        }
+        !finished
       }
 
       override def next(): (K, V) = {
         finished = true
-        result.getKey(0, mergeStatus)
+        result.getKey(mergeResult, mergeStatus)
       }
-
     }
     iter
   }
@@ -215,8 +245,10 @@ class CarbonMergerRDD[K, V](
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
+    val updateStatusManger: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
+      absoluteTableIdentifier)
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
@@ -227,11 +259,17 @@ class CarbonMergerRDD[K, V](
     var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
         util.HashMap[String, util.List[Distributable]]
 
-    val noOfBlocks = 0
+    var noOfBlocks = 0
+    val taskInfoList = new util.ArrayList[Distributable]
     var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
 
+    var blocksOfLastSegment: List[TableBlockInfo] = null
+
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
+      // map for keeping the relation of a task and its blocks.
+      val taskIdMapping: util.Map[String, util.List[TableBlockInfo]] = new
+          util.HashMap[String, util.List[TableBlockInfo]]
 
       // map for keeping the relation of a task and its blocks.
       job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
@@ -239,6 +277,44 @@ class CarbonMergerRDD[K, V](
       // get splits
       val splits = format.getSplits(job)
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+      val updateDetails: UpdateVO = updateStatusManger.getInvalidTimestampRange(eachSeg)
+
+      // take the blocks of one segment.
+      val blocksOfOneSegment = carbonInputSplits.map(inputSplit =>
+        new TableBlockInfo(inputSplit.getPath.toString,
+          inputSplit.getStart, inputSplit.getSegmentId,
+          inputSplit.getLocations, inputSplit.getLength, inputSplit.getVersion
+        )
+      )
+        .filter(blockInfo => !CarbonLoaderUtil
+          .isInvalidTableBlock(blockInfo, updateDetails, updateStatusManger))
+
+      // keep on assigning till last one is reached.
+      if (null != blocksOfOneSegment && blocksOfOneSegment.size > 0) {
+        blocksOfLastSegment = blocksOfOneSegment.asJava
+      }
+
+      // populate the task and its block mapping.
+      blocksOfOneSegment.foreach(f = tableBlockInfo => {
+        val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
+        val blockList = taskIdMapping.get(taskNo)
+        if (null == blockList) {
+          val blockListTemp = new util.ArrayList[TableBlockInfo]()
+          blockListTemp.add(tableBlockInfo)
+          taskIdMapping.put(taskNo, blockListTemp)
+        }
+        else {
+          blockList.add(tableBlockInfo)
+        }
+      })
+
+      noOfBlocks += blocksOfOneSegment.size
+      taskIdMapping.asScala.foreach(
+        entry =>
+          taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
+      )
+
     }
 
     // prepare the details required to extract the segment properties using last segment.
@@ -261,15 +337,15 @@ class CarbonMergerRDD[K, V](
         .toList
     }
 
-    val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava
+    // val blocks = carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava
     // send complete list of blocks to the mapping util.
-    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blocks, -1)
+    nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
 
     val confExecutors = confExecutorsTemp.toInt
     val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
       confExecutors
     } else { nodeBlockMapping.size() }
-    DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, blocks.size)
+    DistributionUtil.ensureExecutors(sparkContext, requiredExecutors, taskInfoList.size)
     logInfo("No.of Executors required=" + requiredExecutors +
             " , spark.executor.instances=" + confExecutors +
             ", no.of.nodes where data present=" + nodeBlockMapping.size())

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 6eb11c7..e06937b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.spark.MergeResultImpl
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.merger.{CompactionType, CarbonDataMergerUtil}
 
 /**
  * Compactor class which handled the compaction cases.
@@ -55,7 +55,7 @@ object Compactor {
     val factTableName = carbonLoadModel.getTableName
     val validSegments: Array[String] = CarbonDataMergerUtil
       .getValidSegments(loadsToMerge).split(',')
-    val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime();
+    val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
     val carbonMergerMapping = CarbonMergerMapping(storeLocation,
       storePath,
       carbonTable.getMetaDataFilepath,
@@ -70,7 +70,7 @@ object Compactor {
       maxSegmentColCardinality = null,
       maxSegmentColumnSchemaList = null
     )
-    carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
+    carbonLoadModel.setStorePath(carbonMergerMapping.hdfsStoreLocation)
     carbonLoadModel.setLoadMetadataDetails(
       SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
     var execInstance = "1"
@@ -87,13 +87,24 @@ object Compactor {
       }
     }
 
-    val mergeStatus = new CarbonMergerRDD(
-      sc.sparkContext,
-      new MergeResultImpl(),
-      carbonLoadModel,
-      carbonMergerMapping,
-      execInstance
-    ).collect
+    val mergeStatus =
+    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+      new CarbonIUDMergerRDD(
+        sc.sparkContext,
+        new MergeResultImpl(),
+        carbonLoadModel,
+        carbonMergerMapping,
+        execInstance
+      ).collect
+    } else {
+      new CarbonMergerRDD(
+        sc.sparkContext,
+        new MergeResultImpl(),
+        carbonLoadModel,
+        carbonMergerMapping,
+        execInstance
+      ).collect
+    }
 
     if (mergeStatus.length == 0) {
       finalMergeStatus = false
@@ -104,10 +115,18 @@ object Compactor {
     if (finalMergeStatus) {
       val endTime = System.nanoTime()
       logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
-      if (!CarbonDataMergerUtil
-        .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
-          mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
-        )) {
+      val statusFileUpdation =
+        (((compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) &&
+          (CarbonDataMergerUtil
+            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
+              carbonTable.getMetaDataFilepath(),
+              carbonLoadModel))) ||
+         (CarbonDataMergerUtil
+           .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
+             mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType))
+          )
+
+      if (!statusFileUpdation) {
         logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 8eca81a..47fdf89 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -22,8 +22,7 @@ import java.util.concurrent._
 
 import org.apache.carbondata.core.update.CarbonUpdateUtil
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager
-import org.apache.carbondata.locks.{LockUsage, CarbonLockUtil, CarbonLockFactory}
-
+import org.apache.carbondata.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
@@ -171,7 +170,9 @@ object DataManagementFunc {
       segList,
       compactionModel.compactionType
     )
-    while (loadsToMerge.size() > 1) {
+    while (loadsToMerge.size() > 1 ||
+           (compactionModel.compactionType.name().equals("IUD_UPDDEL_DELTA_COMPACTION") &&
+            loadsToMerge.size() > 0)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       deletePartialLoadsInCompaction(carbonLoadModel)
       val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
@@ -202,7 +203,6 @@ object DataManagementFunc {
           throw e
       }
 
-
       // scan again and determine if anything is there to merge again.
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       segList = carbonLoadModel.getLoadMetadataDetails
@@ -212,15 +212,23 @@ object DataManagementFunc {
       if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
 
         segList = CarbonDataMergerUtil
-            .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+          .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+      }
+
+      if (compactionModel.compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+        loadsToMerge.clear()
+      } else if (segList.size > 0) {
+        loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+          storePath,
+          carbonLoadModel,
+          compactionModel.compactionSize,
+          segList,
+          compactionModel.compactionType
+        )
+      }
+      else {
+        loadsToMerge.clear()
       }
-      loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-        storePath,
-        carbonLoadModel,
-        compactionModel.compactionSize,
-        segList,
-        compactionModel.compactionType
-      )
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b8479ce..3b2895f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -81,9 +81,9 @@ case class DataLoadTableFileMapping(table: String, loadPath: String)
 case class ExecutionErrors(var failureCauses: FailureCauses, var errorMsg: String )
 
 case class CarbonMergerMapping(storeLocation: String,
-    storePath: String,
+    hdfsStoreLocation: String,
     metadataFilePath: String,
-    mergedLoadName: String,
+    var mergedLoadName: String,
     kettleHomePath: String,
     tableCreationTime: Long,
     databaseName: String,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index f426d65..0d996b7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -359,47 +359,29 @@ object IUDCommon {
     LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].")
     LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].")
 
-
-    if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION ||
-        compactionTypeIUD == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
-
-      try {
-        // Update Compaction.
-        if (compactionTypeIUD == CompactionType.IUD_FACTFILE_COMPACTION) {
-          val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
-            carbonTable.getFactTableName,
-            Some(segmentUpdateStatusManager),
-            CompactionType.IUD_FACTFILE_COMPACTION.toString,
-            Some(factTimeStamp),
-            "")
-
-          AlterTableCompaction(altertablemodel).run(sqlContext)
-          return
-        }
-        else {
-          val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
-            carbonTable.getFactTableName,
-            Some(segmentUpdateStatusManager),
-            CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
-            Some(factTimeStamp),
-            "")
-
-          AlterTableCompaction(altertablemodel).run(sqlContext)
+    try {
+      // Update Compaction.
+      val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
+        carbonTable.getFactTableName,
+        Some(segmentUpdateStatusManager),
+        CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+        Some(factTimeStamp),
+        "")
+
+      AlterTableCompaction(altertablemodel).run(sqlContext)
+    }
+    catch {
+      case e: Exception =>
+        val msg = if (null != e.getMessage) {
+          e.getMessage
+        } else {
+          "Please check logs for more info"
         }
-      }
-      catch {
-        case e: Exception =>
-          val msg = if (null != e.getMessage) {
-            e.getMessage
-          } else {
-            "Please check logs for more info"
-          }
-          throw new HorizontalCompactionException(
-            s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
-      }
+        throw new HorizontalCompactionException(
+          s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
     }
-    LOG.info(s"Horizontal Update Compaction operation completed for [${db}.${table}].")
-    LOG.audit(s"Horizontal Update Compaction operation completed for [${db}.${table}].")
+    LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
+    LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cdae9ed4/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e147003..9bf2a03 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -73,7 +73,22 @@ object CarbonDataRDDFactory {
     if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
       compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
       compactionType = CompactionType.MAJOR_COMPACTION
-    } else {
+    } else if (alterTableModel.compactionType.equalsIgnoreCase("IUD_UPDDEL_DELTA_COMPACTION")) {
+      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      if (alterTableModel.segmentUpdateStatusManager.get != None) {
+        carbonLoadModel
+          .setSegmentUpdateStatusManager(alterTableModel.segmentUpdateStatusManager.get)
+
+        carbonLoadModel
+          .setSegmentUpdateDetails(alterTableModel.segmentUpdateStatusManager.get
+            .getUpdateStatusDetails.toList.asJava)
+
+        carbonLoadModel
+          .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
+            .getLoadMetadataDetails.toList.asJava)
+      }
+    }
+    else {
       compactionType = CompactionType.MINOR_COMPACTION
     }
 
@@ -87,7 +102,12 @@ object CarbonDataRDDFactory {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
     }
     // reading the start time of data load.
-    val loadStartTime = CarbonLoaderUtil.readCurrentTime()
+    val loadStartTime : Long =
+    if (alterTableModel.factTimeStamp.isEmpty) {
+      CarbonUpdateUtil.readCurrentTime
+    } else {
+      alterTableModel.factTimeStamp.get
+    }
     carbonLoadModel.setFactTimeStamp(loadStartTime)
 
     val isCompactionTriggerByDDl = true
@@ -214,7 +234,11 @@ object CarbonDataRDDFactory {
       compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
-    CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+    if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+      // update the updated table status. For the case of Update Delta Compaction the Metadata
+      // is filled in LoadModel, no need to refresh.
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+    }
 
     // clean up of the stale segments.
     try {


Mime
View raw message