carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [08/22] incubator-carbondata git commit: IUD update flow support
Date Fri, 06 Jan 2017 13:57:08 GMT
IUD update flow support


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0d42f52d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0d42f52d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0d42f52d

Branch: refs/heads/master
Commit: 0d42f52d9176135d52bd03243b5de19703b4c1ef
Parents: ff84a2e
Author: ravikiran23 <ravikiran.sn042@gmail.com>
Authored: Mon Jan 2 11:17:24 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../iudprocessor/iuddata/BlockMappingVO.java    |  37 +
 .../iudprocessor/iuddata/RowCountDetailsVO.java |  45 ++
 .../core/load/LoadMetadataDetails.java          | 166 +++-
 .../core/update/CarbonUpdateUtil.java           | 773 +++++++++++++++++++
 .../carbondata/core/update/TupleIdEnum.java     |   2 +-
 .../apache/carbondata/core/update/UpdateVO.java |  98 +++
 .../core/updatestatus/SegmentStatusManager.java | 643 +++++++++++++++
 .../carbondata/core/util/CarbonProperties.java  |  61 +-
 .../fileoperations/FileWriteOperation.java      |  25 +
 .../carbondata/locks/CarbonLockFactory.java     |  94 +++
 .../apache/carbondata/locks/HdfsFileLock.java   | 121 +++
 .../apache/carbondata/locks/LocalFileLock.java  | 164 ++++
 .../apache/carbondata/locks/ZookeeperInit.java  |  82 ++
 .../core/load/LoadMetadataDetailsUnitTest.java  |  21 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |  73 +-
 .../spark/load/DeleteLoadFolders.java           |  41 +-
 .../carbondata/spark/load/FailureCauses.java    |  30 +
 .../org/apache/carbondata/spark/KeyVal.scala    |  31 +
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |   3 +-
 .../carbondata/spark/util/CommonUtil.scala      |   2 +
 .../DataCompactionBlockletBoundryTest.scala     |   3 +-
 .../DataCompactionBoundaryConditionsTest.scala  |   3 +-
 .../DataCompactionCardinalityBoundryTest.scala  |  12 +-
 .../datacompaction/DataCompactionLockTest.scala |  24 +-
 .../DataCompactionMinorThresholdTest.scala      |   8 +-
 .../DataCompactionNoDictionaryTest.scala        |   7 +-
 .../datacompaction/DataCompactionTest.scala     |  15 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |  18 +-
 .../MajorCompactionStopsAfterCompaction.scala   |  14 +-
 .../exception/MultipleMatchingException.java    |  52 ++
 .../processing/model/CarbonLoadModel.java       |  61 +-
 .../carbondata/lcm/locks/LocalFileLockTest.java |   2 +
 .../lcm/locks/ZooKeeperLockingTest.java         |  17 +-
 .../carbondata/test/util/StoreCreator.java      |  36 +-
 34 files changed, 2627 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockMappingVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockMappingVO.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockMappingVO.java
new file mode 100644
index 0000000..2be8b56
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/BlockMappingVO.java
@@ -0,0 +1,37 @@
+package org.apache.carbondata.common.iudprocessor.iuddata;
+
+import java.util.Map;
+
+/**
+ * VO class to store the details of segment and block count , block and its row count.
+ */
+public class BlockMappingVO {
+
+  private Map<String, Long> blockRowCountMapping ;
+
+  private Map<String, Long> segmentNumberOfBlockMapping ;
+
+  private Map<String, RowCountDetailsVO> completeBlockRowDetailVO;
+
+  public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> completeBlockRowDetailVO) {
+    this.completeBlockRowDetailVO = completeBlockRowDetailVO;
+  }
+
+  public Map<String, RowCountDetailsVO> getCompleteBlockRowDetailVO() {
+    return completeBlockRowDetailVO;
+  }
+
+  public Map<String, Long> getBlockRowCountMapping() {
+    return blockRowCountMapping;
+  }
+
+  public Map<String, Long> getSegmentNumberOfBlockMapping() {
+    return segmentNumberOfBlockMapping;
+  }
+
+  public BlockMappingVO(Map<String, Long> blockRowCountMapping,
+      Map<String, Long> segmentNumberOfBlockMapping) {
+    this.blockRowCountMapping = blockRowCountMapping;
+    this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/RowCountDetailsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/RowCountDetailsVO.java b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/RowCountDetailsVO.java
new file mode 100644
index 0000000..d0c74c5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/common/iudprocessor/iuddata/RowCountDetailsVO.java
@@ -0,0 +1,45 @@
+package org.apache.carbondata.common.iudprocessor.iuddata;
+
+import java.io.Serializable;
+
+/**
+ * VO class Details for block.
+ */
+public class RowCountDetailsVO implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918491749L;
+
+  private long totalNumberOfRows;
+
+  private long deletedRowsInBlock;
+
+  public RowCountDetailsVO(long totalNumberOfRows, long deletedRowsInBlock) {
+    this.totalNumberOfRows = totalNumberOfRows;
+    this.deletedRowsInBlock = deletedRowsInBlock;
+  }
+
+  public long getTotalNumberOfRows() {
+    return totalNumberOfRows;
+  }
+
+  public long getDeletedRowsInBlock() {
+    return deletedRowsInBlock;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RowCountDetailsVO that = (RowCountDetailsVO) o;
+
+    if (totalNumberOfRows != that.totalNumberOfRows) return false;
+    return deletedRowsInBlock == that.deletedRowsInBlock;
+
+  }
+
+  @Override public int hashCode() {
+    int result = (int) (totalNumberOfRows ^ (totalNumberOfRows >>> 32));
+    result = 31 * result + (int) (deletedRowsInBlock ^ (deletedRowsInBlock >>> 32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
index d527ba4..d6b1e35 100644
--- a/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/load/LoadMetadataDetails.java
@@ -19,15 +19,15 @@
 
 package org.apache.carbondata.core.load;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
 import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
 public class LoadMetadataDetails implements Serializable {
 
   private static final long serialVersionUID = 1106104914918491724L;
@@ -35,6 +35,16 @@ public class LoadMetadataDetails implements Serializable {
   private String loadStatus;
   private String loadName;
   private String partitionCount;
+  private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
+
+  // update delta end timestamp
+  private String updateDeltaEndTimestamp = "";
+
+  // update delta start timestamp
+  private String updateDeltaStartTimestamp = "";
+
+  // this will represent the update status file name at that point of time.
+  private String updateStatusFileName = "";
 
   /**
    * LOGGER
@@ -42,6 +52,7 @@ public class LoadMetadataDetails implements Serializable {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
 
+  // dont remove static as the write will fail.
   private static final SimpleDateFormat parser =
       new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
   /**
@@ -69,12 +80,12 @@ public class LoadMetadataDetails implements Serializable {
     this.partitionCount = partitionCount;
   }
 
-  public String getTimestamp() {
-    return timestamp;
+  public long getLoadEndTime() {
+    return convertTimeStampToLong(timestamp);
   }
 
-  public void setTimestamp(String timestamp) {
-    this.timestamp = timestamp;
+  public void setLoadEndTime(long timestamp) {
+    this.timestamp = getTimeStampConvertion(timestamp);;
   }
 
   public String getLoadStatus() {
@@ -96,15 +107,20 @@ public class LoadMetadataDetails implements Serializable {
   /**
    * @return the modificationOrdeletionTimesStamp
    */
-  public String getModificationOrdeletionTimesStamp() {
-    return modificationOrdeletionTimesStamp;
+  public long getModificationOrdeletionTimesStamp() {
+    if(null == modificationOrdeletionTimesStamp) {
+      return 0;
+    }
+    return convertTimeStampToLong(modificationOrdeletionTimesStamp);
   }
 
   /**
    * @param modificationOrdeletionTimesStamp the modificationOrdeletionTimesStamp to set
    */
-  public void setModificationOrdeletionTimesStamp(String modificationOrdeletionTimesStamp) {
-    this.modificationOrdeletionTimesStamp = modificationOrdeletionTimesStamp;
+  public void setModificationOrdeletionTimesStamp(long modificationOrdeletionTimesStamp) {
+
+    this.modificationOrdeletionTimesStamp =
+        getTimeStampConvertion(modificationOrdeletionTimesStamp);
   }
 
   /* (non-Javadoc)
@@ -142,28 +158,52 @@ public class LoadMetadataDetails implements Serializable {
   /**
    * @return the startLoadTime
    */
-  public String getLoadStartTime() {
-    return loadStartTime;
+  public long getLoadStartTime() {
+    return convertTimeStampToLong(loadStartTime);
   }
 
   /**
    * return loadStartTime
+   *
    * @return
    */
-  public Long getLoadStartTimeAsLong() {
-    return getTimeStamp(loadStartTime);
+  public long getLoadStartTimeAsLong() {
+    return (!loadStartTime.isEmpty()) ? getTimeStamp(loadStartTime) : 0;
   }
 
   /**
-   * returns load start time as long value
-   * @param loadStartTime
+   * This method will convert a given timestamp to long value and then to string back
+   *
+   * @param factTimeStamp
    * @return
    */
-  private Long getTimeStamp(String loadStartTime) {
-    if (loadStartTime.isEmpty()) {
-      return null;
+  private long convertTimeStampToLong(String factTimeStamp) {
+    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return dateToStr.getTime();
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
+      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      try {
+        dateToStr = parser.parse(factTimeStamp);
+        return dateToStr.getTime();
+      } catch (ParseException e1) {
+        LOGGER
+            .error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage());
+        return 0;
+      }
     }
+  }
 
+  /**
+   * returns load start time as long value
+   *
+   * @param loadStartTime
+   * @return
+   */
+  public Long getTimeStamp(String loadStartTime) {
     Date dateToStr = null;
     try {
       dateToStr = parser.parse(loadStartTime);
@@ -173,11 +213,17 @@ public class LoadMetadataDetails implements Serializable {
       return null;
     }
   }
+
+  private String getTimeStampConvertion(long time) {
+    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    return sdf.format(time);
+  }
+
   /**
    * @param loadStartTime
    */
-  public void setLoadStartTime(String loadStartTime) {
-    this.loadStartTime = loadStartTime;
+  public void setLoadStartTime(long loadStartTime) {
+    this.loadStartTime = getTimeStampConvertion(loadStartTime);
   }
 
   /**
@@ -210,6 +256,7 @@ public class LoadMetadataDetails implements Serializable {
 
   /**
    * Return true if it is a major compacted segment.
+   *
    * @return
    */
   public String isMajorCompacted() {
@@ -218,9 +265,82 @@ public class LoadMetadataDetails implements Serializable {
 
   /**
    * Set true if it is a major compacted segment.
+   *
    * @param majorCompacted
    */
   public void setMajorCompacted(String majorCompacted) {
     this.majorCompacted = majorCompacted;
   }
+
+  /**
+   * To get isDeleted property.
+   *
+   * @return isDeleted
+   */
+  public String getIsDeleted() {
+    return isDeleted;
+  }
+
+  /**
+   * To set isDeleted property.
+   *
+   * @param isDeleted
+   */
+  public void setIsDeleted(String isDeleted) {
+    this.isDeleted = isDeleted;
+  }
+
+  /**
+   * To get the update delta end timestamp
+   *
+   * @return updateDeltaEndTimestamp
+   */
+  public String getUpdateDeltaEndTimestamp() {
+    return updateDeltaEndTimestamp;
+  }
+
+  /**
+   * To set the update delta end timestamp
+   *
+   * @param updateDeltaEndTimestamp
+   */
+  public void setUpdateDeltaEndTimestamp(String updateDeltaEndTimestamp) {
+    this.updateDeltaEndTimestamp = updateDeltaEndTimestamp;
+  }
+
+  /**
+   * To get the update delta start timestamp
+   *
+   * @return updateDeltaStartTimestamp
+   */
+  public String getUpdateDeltaStartTimestamp() {
+    return updateDeltaStartTimestamp;
+  }
+
+  /**
+   * To set the update delta start timestamp
+   *
+   * @param updateDeltaStartTimestamp
+   */
+  public void setUpdateDeltaStartTimestamp(String updateDeltaStartTimestamp) {
+    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
+  }
+
+  /**
+   * To get the updateStatusFileName
+   *
+   * @return updateStatusFileName
+   */
+  public String getUpdateStatusFileName() {
+    return updateStatusFileName;
+  }
+
+  /**
+   * To set the updateStatusFileName
+   *
+   * @param updateStatusFileName
+   */
+  public void setUpdateStatusFileName(String updateStatusFileName) {
+    this.updateStatusFileName = updateStatusFileName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
new file mode 100644
index 0000000..b404b61
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
@@ -0,0 +1,773 @@
+
+package org.apache.carbondata.core.update;
+
+import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO;
+import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.locks.ICarbonLock;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ *
+ */
+public class CarbonUpdateUtil {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
+
+  /**
+   * returns required filed from tuple id
+   *
+   * @param Tid
+   * @param tid
+   * @return
+   */
+  public static String getRequiredFieldFromTID(String Tid, TupleIdEnum tid) {
+    return Tid.split("/")[tid.getTupleIdIndex()];
+  }
+
+  /**
+   * returns segment along with block id
+   * @param Tid
+   * @return
+   */
+  public static String getSegmentWithBlockFromTID(String Tid) {
+    return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
+            + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, TupleIdEnum.BLOCK_ID);
+  }
+
+  /**
+   * Returns block path from tuple id
+   *
+   * @param tid
+   * @param factPath
+   * @return
+   */
+  public static String getTableBlockPath(String tid, String factPath) {
+    String part =
+            CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
+    String segment =
+            CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
+    return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
+            + CarbonCommonConstants.FILE_SEPARATOR + segment;
+
+  }
+
+  /**
+   * returns delete delta file path
+   *
+   * @param blockPath
+   * @param blockPath
+   * @param timestamp
+   * @return
+   */
+  public static String getDeleteDeltaFilePath(String blockPath, String blockName,
+                                              String timestamp) {
+    return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
+            + CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
+
+  }
+
+  /**
+   * @param updateDetailsList
+   * @param table
+   * @param updateStatusFileIdentifier
+   * @return
+   */
+  public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
+                                            CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
+    boolean status = false;
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
+    boolean lockStatus = false;
+
+    try {
+      lockStatus = updateLock.lockWithRetries();
+      if (lockStatus) {
+
+        AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                        absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        // read the existing file if present and update the same.
+        SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
+                .getUpdateStatusDetails();
+
+        List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails));
+
+        for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
+          int index = oldList.indexOf(newBlockEntry);
+          if (index != -1) {
+            // update the element in existing list.
+            SegmentUpdateDetails blockDetail = oldList.get(index);
+            if(blockDetail.getDeleteDeltaStartTimestamp().isEmpty() ||
+                    (isCompaction == true)) {
+              blockDetail
+                      .setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
+            }
+            blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
+            blockDetail.setStatus(newBlockEntry.getStatus());
+            blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
+          } else {
+            // add the new details to the list.
+            oldList.add(newBlockEntry);
+          }
+        }
+
+        segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, updateStatusFileIdentifier);
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the segment update lock.");
+        status = false;
+      }
+    } catch (IOException e) {
+      status = false;
+    } finally {
+      if (lockStatus) {
+        if (updateLock.unlock()) {
+          LOGGER.info("Unlock the segment update lock successfull.");
+        } else {
+          LOGGER.error("Not able to unlock the segment update lock.");
+        }
+      }
+    }
+    return status;
+  }
+
+  /**
+   * @param updatedSegmentsList
+   * @param table
+   * @return
+   */
+  public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
+                                                  CarbonTable table, String updatedTimeStamp,
+                                                  boolean isTimestampUpdationRequired,
+                                                  List<String> segmentsToBeDeleted) {
+
+    boolean status = false;
+
+    String metaDataFilepath = table.getMetaDataFilepath();
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    boolean lockStatus = false;
+    try {
+      lockStatus = carbonLock.lockWithRetries();
+      if (lockStatus) {
+        LOGGER.info(
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                        + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+
+          if (isTimestampUpdationRequired) {
+            // we are storing the link between the 2 status files in the segment 0 only.
+            if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+              loadMetadata.setUpdateStatusFileName(
+                      CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+            }
+
+            // if the segments is in the list of marked for delete then update the status.
+            if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
+              loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+              loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
+            }
+          }
+          for (String segName : updatedSegmentsList) {
+            if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
+              // if this call is coming from the delete delta flow then the time stamp
+              // String will come empty then no need to write into table status file.
+              if (isTimestampUpdationRequired) {
+                loadMetadata.setIsDeleted(CarbonCommonConstants.KEYWORD_TRUE);
+                // if in case of update flow.
+                if (loadMetadata.getUpdateDeltaStartTimestamp().isEmpty()) {
+                  // this means for first time it is getting updated .
+                  loadMetadata.setUpdateDeltaStartTimestamp(updatedTimeStamp);
+                }
+                // update end timestamp for each time.
+                loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
+              }
+            }
+          }
+        }
+
+        try {
+          segmentStatusManager
+                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+        } catch (IOException e) {
+          return false;
+        }
+
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+      }
+    } finally {
+      if (lockStatus) {
+        if (carbonLock.unlock()) {
+          LOGGER.info(
+                  "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                          + "." + table.getFactTableName());
+        } else {
+          LOGGER.error(
+                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+                          .getFactTableName() + " during table status updation");
+        }
+      }
+    }
+    return status;
+
+  }
+
+  /**
+   * gets the file name of the update status file. by appending the latest timestamp to it.
+   *
+   * @param updatedTimeStamp
+   * @return
+   */
+  public static String getUpdateStatusFileName(String updatedTimeStamp) {
+    return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN
+            + updatedTimeStamp;
+  }
+
+  /**
+   * This will handle the clean up cases if the update fails.
+   *
+   * @param table
+   * @param timeStamp
+   */
+  public static void cleanStaleDeltaFiles(CarbonTable table, final String timeStamp) {
+
+    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+    // as of now considering only partition 0.
+    String partitionId = "0";
+    String partitionDir = carbonTablePath.getPartitionDir(partitionId);
+    CarbonFile file =
+            FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
+    if(!file.exists()) {
+      return;
+    }
+    for (CarbonFile eachDir : file.listFiles()) {
+      // for each dir check if the file with the delta timestamp is present or not.
+      CarbonFile[] toBeDeleted = eachDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          String fileName = file.getName();
+          return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
+                  || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+                  || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+        }
+      });
+      // deleting the files of a segment.
+      try {
+        CarbonUtil.deleteFoldersAndFilesSilent(toBeDeleted);
+      } catch (CarbonUtilException e) {
+        LOGGER.error("Exception in deleting the delta files." + e);
+      }
+    }
+  }
+
+  /**
+   * returns timestamp as long value
+   *
+   * @param timtstamp
+   * @return
+   */
+  public static Long getTimeStampAsLong(String timtstamp) {
+    try {
+      long longValue = Long.parseLong(timtstamp);
+      return longValue;
+    } catch (NumberFormatException nfe) {
+      String errorMsg = "Invalid timestamp : " + timtstamp;
+      LOGGER.error(errorMsg);
+      return null;
+    }
+  }
+
+  /**
+   * returns integer value from given string
+   *
+   * @param value
+   * @return
+   * @throws Exception
+   */
+  public static Integer getIntegerValue(String value) throws Exception {
+    try {
+      int intValue = Integer.parseInt(value);
+      return intValue;
+    } catch (NumberFormatException nfe) {
+      LOGGER.error("Invalid row : " + value + nfe.getLocalizedMessage());
+      throw new Exception("Invalid row : " + nfe.getLocalizedMessage());
+    }
+  }
+
+  /**
+   * return only block name from completeBlockName
+   *
+   * @param completeBlockName
+   * @return
+   */
+  public static String getBlockName(String completeBlockName) {
+    String blockName =
+            completeBlockName.substring(0, completeBlockName.lastIndexOf(CarbonCommonConstants.HYPHEN));
+    return blockName;
+  }
+
+  /**
+   * returns segment id from segment name
+   *
+   * @param segmentName
+   * @return
+   */
+  public static String getSegmentId(String segmentName) {
+    String id = segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
+    return id;
+  }
+
+  public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
+
+    // scan all the carbondata files and get the latest task ID.
+    CarbonFile segment =
+            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          return true;
+        }
+        return false;
+      }
+    });
+    int max = 0;
+    if (null != dataFiles) {
+      for (CarbonFile file : dataFiles) {
+        int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
+        if (taskNumber > max) {
+          max = taskNumber;
+        }
+      }
+    }
+    // return max task No
+    return max;
+
+  }
+
+  public static String getLatestBlockNameForSegment(String segmentId, CarbonTablePath tablePath) {
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
+
+    // scan all the carbondata files and get the latest task ID.
+    CarbonFile segment =
+            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+
+    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        int max = 0;
+        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+          int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
+          if (taskNumber >= max) {
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    // get the latest among the data files. highest task number will be at the last.
+    return dataFiles[dataFiles.length - 1].getName();
+  }
+
+  /**
+   * This method will convert a given timestamp to long value and then to string back
+   *
+   * @param factTimeStamp
+   * @return
+   */
+  public static String convertTimeStampToString(String factTimeStamp) {
+    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return Long.toString(dateToStr.getTime());
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * This method will convert a given timestamp to long value and then to string back
+   *
+   * @param factTimeStamp
+   * @return
+   */
+  public static long convertTimeStampToLong(String factTimeStamp) {
+    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+    Date dateToStr = null;
+    try {
+      dateToStr = parser.parse(factTimeStamp);
+      return dateToStr.getTime();
+    } catch (ParseException e) {
+      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
+      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      try {
+        dateToStr = parser.parse(factTimeStamp);
+        return dateToStr.getTime();
+      } catch (ParseException e1) {
+        LOGGER
+                .error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage());
+        return 0;
+      }
+    }
+  }
+
+
+  /**
+   * Handling of the clean up of old carbondata files, index files , delte delta,
+   * update status files.
+   * @param table clean up will be handled on this table.
+   * @param forceDelete if true then max query execution timeout will not be considered.
+   */
+  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) {
+
+    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
+                    table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+
+    LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
+
+    String validUpdateStatusFile = "";
+
+    // scan through each segment.
+
+    for (LoadMetadataDetails segment : details) {
+
+      // take the update status file name from 0th segment.
+      validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
+
+      // if this segment is valid then only we will go for delta file deletion.
+      // if the segment is mark for delete or compacted then any way it will get deleted.
+
+      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+              || segment.getLoadStatus()
+              .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+
+        // take the list of files from this segment.
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+        CarbonFile segDir =
+                FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+        CarbonFile[] allSegmentFiles = segDir.listFiles();
+
+        // scan through the segment and find the carbondatafiles and index files.
+        SegmentUpdateStatusManager updateStatusManager =
+                new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+
+        // get Invalid update  delta files.
+        CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
+                .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                        CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
+
+        // now for each invalid delta file need to check the query execution time out
+        // and then delete.
+
+        for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
+
+          compareTimestampsAndDelete(invalidFile, forceDelete, false);
+        }
+
+        // do the same for the index files.
+        CarbonFile[] invalidIndexFiles = updateStatusManager
+                .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                        CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
+
+        // now for each invalid index file need to check the query execution time out
+        // and then delete.
+
+        for (CarbonFile invalidFile : invalidIndexFiles) {
+
+          compareTimestampsAndDelete(invalidFile, forceDelete, false);
+        }
+
+        // now handle all the delete delta files which needs to be deleted.
+        // there are 2 cases here .
+        // 1. if the block is marked as compacted then the corresponding delta files
+        //    can be deleted if query exec timeout is done.
+        // 2. if the block is in success state then also there can be delete
+        //    delta compaction happened and old files can be deleted.
+
+        SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
+        for (SegmentUpdateDetails block : updateDetails) {
+          CarbonFile[] completeListOfDeleteDeltaFiles;
+          CarbonFile[] invalidDeleteDeltaFiles;
+
+          if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
+            continue;
+          }
+
+          // case 1
+          if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+            completeListOfDeleteDeltaFiles = updateStatusManager
+                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
+                            allSegmentFiles);
+            for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+
+            CarbonFile[] blockRelatedFiles = updateStatusManager
+                    .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
+                            block.getActualBlockName());
+
+            // now for each invalid index file need to check the query execution time out
+            // and then delete.
+
+            for (CarbonFile invalidFile : blockRelatedFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+
+
+          } else {
+            invalidDeleteDeltaFiles = updateStatusManager
+                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
+                            allSegmentFiles);
+            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+
+              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            }
+          }
+        }
+      }
+    }
+
+    // delete the update table status files which are old.
+    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
+
+      final String updateStatusTimestamp = validUpdateStatusFile
+              .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
+
+      CarbonFile metaFolder = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(),
+              FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
+
+      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
+
+            // CHECK if this is valid or not.
+            // we only send invalid ones to delete.
+            if (!file.getName().endsWith(updateStatusTimestamp)) {
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+
+      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
+
+        compareTimestampsAndDelete(invalidFile, forceDelete, true);
+      }
+    }
+  }
+
+  /**
+   * This will tell whether the max query timeout has been expired or not.
+   * @param fileTimestamp
+   * @return
+   */
+  public static boolean isMaxQueryTimeoutExceeded(long fileTimestamp) {
+    // record current time.
+    long currentTime = CarbonUpdateUtil.readCurrentTime();
+    int maxTime;
+    try {
+      maxTime = Integer.parseInt(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+    } catch (NumberFormatException e) {
+      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+    }
+
+    long difference = currentTime - fileTimestamp;
+
+    long minutesElapsed = (difference / (1000 * 60));
+
+    return minutesElapsed > maxTime;
+
+  }
+
+  /**
+   *
+   * @param invalidFile
+   * @param forceDelete
+   * @param isUpdateStatusFile if true then the parsing of file name logic changes.
+   */
+  private static void compareTimestampsAndDelete(CarbonFile invalidFile,
+                                                 boolean forceDelete, boolean isUpdateStatusFile) {
+    long fileTimestamp = 0L;
+
+    if (isUpdateStatusFile) {
+      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
+              .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
+    } else {
+      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
+              CarbonTablePath.DataFileUtil.getUpdateTimeStamp(invalidFile.getName()));
+    }
+
+    // if the timestamp of the file is more than the current time by query execution timeout.
+    // then delete that file.
+    if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+      // delete the files.
+      try {
+        LOGGER.info("deleting the invalid file : " + invalidFile.getName());
+        CarbonUtil.deleteFoldersAndFiles(invalidFile);
+      } catch (CarbonUtilException e) {
+        LOGGER.error("error in clean up of compacted files." + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   *
+   * @param blockStatus
+   * @return
+   */
+  public static boolean isBlockInvalid(String blockStatus) {
+    if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
+            .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This will return the current time in millis.
+   * @return
+   */
+  public static long readCurrentTime() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   *
+   * @param details
+   * @param segmentBlockCount
+   */
+  public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
+                                                Map<String, Long> segmentBlockCount) {
+
+    String segId = details.getSegmentName();
+
+    segmentBlockCount.put(details.getSegmentName(), segmentBlockCount.get(segId) - 1);
+
+  }
+
+  /**
+   *
+   * @param segmentBlockCount
+   * @return
+   */
+  public static List<String> getListOfSegmentsToMarkDeleted( Map<String, Long> segmentBlockCount) {
+    List<String> segmentsToBeDeleted =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
+
+      if (eachSeg.getValue() == 0) {
+        segmentsToBeDeleted.add(eachSeg.getKey());
+      }
+
+    }
+    return segmentsToBeDeleted;
+  }
+
+  /**
+   *
+   * @param blockMappingVO
+   * @param segmentUpdateStatusManager
+   */
+  public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
+                                           SegmentUpdateStatusManager segmentUpdateStatusManager) {
+
+    Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
+
+    Map<String, RowCountDetailsVO> outputMap =
+            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
+      String key = blockRowEntry.getKey();
+      long alreadyDeletedCount = 0;
+
+      SegmentUpdateDetails detail = segmentUpdateStatusManager.getDetailsForABlock(key);
+
+      if (null != detail) {
+
+        alreadyDeletedCount = Long.parseLong(detail.getDeletedRowsInBlock());
+
+      }
+
+      RowCountDetailsVO rowCountDetailsVO =
+              new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
+      outputMap.put(key, rowCountDetailsVO);
+
+    }
+
+    blockMappingVO.setCompleteBlockRowDetailVO(outputMap);
+
+  }
+
+  /**
+   *
+   * @param segID
+   * @param blockName
+   * @return
+   */
+  public static String getSegmentBlockNameKey(String segID, String blockName) {
+
+    String blockNameWithOutPart = blockName
+            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+                    blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+
+    return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java b/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
index d4d9eb2..2a38ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
+++ b/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package com.huawei.carbondata.core.update;
+package org.apache.carbondata.core.update;
 
 /**
  * Enum class for tupleID.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
new file mode 100644
index 0000000..4961bc6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.update;
+
+import java.io.Serializable;
+
+/**
+ * VO class for storing details related to Update operation.
+ */
+public class UpdateVO implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private Long factTimestamp;
+
+  private Long updateDeltaStartTimestamp;
+
+  public Long getLatestUpdateTimestamp() {
+    return latestUpdateTimestamp;
+  }
+
+  public void setLatestUpdateTimestamp(Long latestUpdateTimestamp) {
+    this.latestUpdateTimestamp = latestUpdateTimestamp;
+  }
+
+  private Long latestUpdateTimestamp;
+
+  public Long getFactTimestamp() {
+    return factTimestamp;
+  }
+
+  public void setFactTimestamp(Long factTimestamp) {
+    this.factTimestamp = factTimestamp;
+  }
+
+  public Long getUpdateDeltaStartTimestamp() {
+    return updateDeltaStartTimestamp;
+  }
+
+  public void setUpdateDeltaStartTimestamp(Long updateDeltaStartTimestamp) {
+    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    UpdateVO updateVO = (UpdateVO) o;
+    if (factTimestamp != null ?
+        !factTimestamp.equals(updateVO.factTimestamp) :
+        updateVO.factTimestamp != null) {
+      return false;
+    }
+    if (updateDeltaStartTimestamp != null ?
+        !updateDeltaStartTimestamp.equals(updateVO.updateDeltaStartTimestamp) :
+        updateVO.updateDeltaStartTimestamp != null) {
+      return false;
+    }
+    return latestUpdateTimestamp != null ?
+        latestUpdateTimestamp.equals(updateVO.latestUpdateTimestamp) :
+        updateVO.latestUpdateTimestamp == null;
+
+  }
+
+  @Override public int hashCode() {
+    int result = factTimestamp != null ? factTimestamp.hashCode() : 0;
+    result = 31 * result + (updateDeltaStartTimestamp != null ?
+        updateDeltaStartTimestamp.hashCode() :
+        0);
+    result = 31 * result + (latestUpdateTimestamp != null ? latestUpdateTimestamp.hashCode() : 0);
+    return result;
+  }
+
+  /**
+   * This will return the update timestamp if its present or it will return the fact timestamp.
+   * @return
+   */
+  public Long getCreatedOrUpdatedTimeStamp() {
+    if (null == latestUpdateTimestamp) {
+      return factTimestamp;
+    }
+    return latestUpdateTimestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
new file mode 100644
index 0000000..c400637
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.updatestatus;
+
+import com.google.gson.Gson;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.fileoperations.FileWriteOperation;
+import org.apache.carbondata.locks.CarbonLockFactory;
+import org.apache.carbondata.locks.CarbonLockUtil;
+import org.apache.carbondata.locks.ICarbonLock;
+import org.apache.carbondata.locks.LockUsage;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+/**
+ * Manages Load/Segment status
+ */
+public class SegmentStatusManager {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(SegmentStatusManager.class.getName());
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  /**
+   * This will return the lock object used to lock the table status file before updation.
+   *
+   * @return
+   */
+  public ICarbonLock getTableStatusLock() {
+    return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+            LockUsage.TABLE_STATUS_LOCK);
+  }
+
+  /**
+   * This method will return last modified time of tablestatus file
+   */
+  public long getTableStatusLastModifiedTime() throws IOException {
+    String tableStatusPath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
+    if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
+      return 0L;
+    } else {
+      return FileFactory.getCarbonFile(tableStatusPath, FileFactory.getFileType(tableStatusPath))
+              .getLastModifiedTime();
+    }
+  }
+
+  public List<Long> getUpdateDeltaStartEndTimeStamp(final String loadIds,
+                                                    LoadMetadataDetails[] listOfLoadFolderDetailsArray) {
+    List<Long> updateDeltaStartEndTimestamp = new ArrayList<>();
+    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+      if (loadIds.equalsIgnoreCase(loadMetadata.getLoadName())) {
+        // Make sure the Load is not compacted and not marked for delete.
+        if (CarbonCommonConstants.COMPACTED
+                .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+          return null;
+        } else if (CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+          return null;
+        }
+        else {
+          return updateDeltaStartEndTimestamp;
+        }
+      }
+    }
+    return null;
+  }
+
+
+  /**
+   * get valid segment for given table
+   *
+   * @return
+   * @throws IOException
+   */
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
+
+    // @TODO: move reading LoadStatus file to separate class
+    List<String> listOfValidSegments = new ArrayList<String>(10);
+    List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
+    List<String> listOfInvalidSegments = new ArrayList<String>(10);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+    String dataPath = carbonTablePath.getTableStatusFilePath();
+    DataInputStream dataInputStream = null;
+    Gson gsonObjectToRead = new Gson();
+    AtomicFileOperations fileOperation =
+            new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
+    LoadMetadataDetails[] loadFolderDetailsArray;
+    try {
+      if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
+
+        dataInputStream = fileOperation.openForRead();
+
+        BufferedReader buffReader =
+                new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
+
+        loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
+        //just directly iterate Array
+        List<LoadMetadataDetails> loadFolderDetails = Arrays.asList(loadFolderDetailsArray);
+
+        for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) {
+          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.MARKED_FOR_UPDATE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+            // check for merged loads.
+            if (null != loadMetadataDetails.getMergedLoadName()) {
+              if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
+                listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+              }
+              // if merged load is updated then put it in updated list
+              if (CarbonCommonConstants.MARKED_FOR_UPDATE
+                      .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+                listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+              }
+              continue;
+            }
+
+            if (CarbonCommonConstants.MARKED_FOR_UPDATE
+                    .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+
+              listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+            }
+            listOfValidSegments.add(loadMetadataDetails.getLoadName());
+          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.COMPACTED
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
+                  || CarbonCommonConstants.MARKED_FOR_DELETE
+                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
+            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+          }
+
+        }
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+      throw e;
+    } finally {
+      try {
+
+        if (null != dataInputStream) {
+          dataInputStream.close();
+        }
+      } catch (Exception e) {
+        LOG.error(e);
+        throw e;
+      }
+
+    }
+    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
+            listOfInvalidSegments);
+  }
+
+  /**
+   * This method reads the load metadata file
+   *
+   * @param tableFolderPath
+   * @return
+   */
+  public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    String metadataFileName = tableFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.LOADMETADATA_FILENAME;
+    LoadMetadataDetails[] listOfLoadFolderDetailsArray;
+
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName));
+
+    try {
+      if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) {
+        return new LoadMetadataDetails[0];
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      listOfLoadFolderDetailsArray =
+          gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
+    } catch (IOException e) {
+      return new LoadMetadataDetails[0];
+    } finally {
+      closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return listOfLoadFolderDetailsArray;
+  }
+
+  /**
+   * compares two given date strings
+   *
+   * @param loadValue
+   * @param userValue
+   * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
+   * 0 otherwise
+   */
+  private static Integer compareDateValues(Long loadValue, Long userValue) {
+    return loadValue.compareTo(userValue);
+  }
+
+  /**
+   * updates deletion status
+   *
+   * @param loadIds
+   * @param tableFolderPath
+   * @return
+   */
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+      List<String> loadIds, String tableFolderPath) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
+    List<String> invalidLoadIds = new ArrayList<String>(0);
+    try {
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
+        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+        if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
+          // log error.
+          LOG.error("Load metadata file is not present.");
+          return loadIds;
+        }
+        // read existing metadata details in load metadata.
+        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+        if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
+          updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
+          if (invalidLoadIds.isEmpty()) {
+            // All or None , if anything fails then dont write
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired");
+              // To handle concurrency scenarios, always take latest metadata before writing
+              // into status file.
+              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
+              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
+                  latestLoadMetadataDetails);
+              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
+            }
+            else {
+              String errorMsg = "Delete segment by id is failed for " + tableDetails
+                  + ". Not able to acquire the table status lock due to other operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+            }
+
+          } else {
+            return invalidLoadIds;
+          }
+
+        } else {
+          LOG.audit("Delete segment by Id is failed. No matching segment id found.");
+          return loadIds;
+        }
+
+      } else {
+        String errorMsg = "Delete segment by id is failed for " + tableDetails
+            + ". Not able to acquire the delete segment lock due to another delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
+      }
+    } catch (IOException e) {
+      LOG.error("IOException" + e.getMessage());
+    } finally {
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
+    }
+
+    return invalidLoadIds;
+  }
+
+  /**
+   * updates deletion status
+   *
+   * @param loadDate
+   * @param tableFolderPath
+   * @return
+   */
+  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+      String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
+    List<String> invalidLoadTimestamps = new ArrayList<String>(0);
+    try {
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
+        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+
+        if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
+          // log error.
+          LOG.error("Error message: " + "Load metadata file is not present.");
+          invalidLoadTimestamps.add(loadDate);
+          return invalidLoadTimestamps;
+        }
+        // read existing metadata details in load metadata.
+        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+        if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
+          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps,
+              loadStartTime);
+          if (invalidLoadTimestamps.isEmpty()) {
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired.");
+              // To handle concurrency scenarios, always take latest metadata before writing
+              // into status file.
+              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
+              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
+                  latestLoadMetadataDetails);
+              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
+            }
+            else {
+
+              String errorMsg = "Delete segment by date is failed for " + tableDetails
+                  + ". Not able to acquire the table status lock due to other operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+
+            }
+          } else {
+            return invalidLoadTimestamps;
+          }
+
+        } else {
+          LOG.audit("Delete segment by date is failed. No matching segment found.");
+          invalidLoadTimestamps.add(loadDate);
+          return invalidLoadTimestamps;
+        }
+
+      } else {
+        String errorMsg = "Delete segment by date is failed for " + tableDetails
+            + ". Not able to acquire the delete segment lock due to another delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
+      }
+    } catch (IOException e) {
+      LOG.error("Error message: " + "IOException" + e.getMessage());
+    } finally {
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
+    }
+
+    return invalidLoadTimestamps;
+  }
+
+  /**
+   * writes load details into a given file at @param dataLoadLocation
+   *
+   * @param dataLoadLocation
+   * @param listOfLoadFolderDetailsArray
+   * @throws IOException
+   */
+  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    // write the updated data into the metadata file.
+
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
+      brWriter.write(metadataInstance);
+    } catch (IOException ioe) {
+      LOG.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+
+  }
+
+  /**
+   * updates deletion status details for each load and returns invalidLoadIds
+   *
+   * @param loadIds
+   * @param listOfLoadFolderDetailsArray
+   * @param invalidLoadIds
+   * @return invalidLoadIds
+   */
+  private static List<String> updateDeletionStatus(List<String> loadIds,
+                                            LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) {
+    for (String loadId : loadIds) {
+      boolean loadFound = false;
+      // For each load id loop through data and if the
+      // load id is found then mark
+      // the metadata as deleted.
+      for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+
+        if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
+          // if the segment is compacted then no need to delete that.
+          if (CarbonCommonConstants.COMPACTED
+                  .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+            LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
+            invalidLoadIds.add(loadId);
+            return invalidLoadIds;
+          }
+          if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+            loadFound = true;
+            loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+            loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
+            LOG.info("Segment ID " + loadId + " Marked for Delete");
+          }
+          break;
+        }
+      }
+
+      if (!loadFound) {
+        LOG.audit("Delete segment by ID is failed. No matching segment id found :" + loadId);
+        invalidLoadIds.add(loadId);
+        return invalidLoadIds;
+      }
+
+    }
+    return invalidLoadIds;
+  }
+
+  /**
+   * updates deletion status details for load and returns invalidLoadTimestamps
+   *
+   * @param loadDate
+   * @param listOfLoadFolderDetailsArray
+   * @param invalidLoadTimestamps
+   * @return invalidLoadTimestamps
+   */
+  public static List<String> updateDeletionStatus(String loadDate,
+      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
+      Long loadStartTime) {
+    // For each load timestamp loop through data and if the
+    // required load timestamp is found then mark
+    // the metadata as deleted.
+    boolean loadFound = false;
+    String loadStartTimeString = "Load Start Time: ";
+    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+      Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
+      if (result < 0) {
+        if (CarbonCommonConstants.COMPACTED
+            .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment has been compacted.");
+          continue;
+        }
+        if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+          loadFound = true;
+          updateSegmentMetadataDetails(loadMetadata);
+          LOG.info("Info: " +
+              loadStartTimeString + loadMetadata.getLoadStartTime() +
+              " Marked for Delete");
+        }
+      }
+
+    }
+
+    if (!loadFound) {
+      invalidLoadTimestamps.add(loadDate);
+      LOG.audit("Delete segment by date is failed. No matching segment found.");
+      return invalidLoadTimestamps;
+    }
+    return invalidLoadTimestamps;
+  }
+
+  /**
+   * This method closes the streams
+   *
+   * @param streams - streams to close.
+   */
+  private static void closeStreams(Closeable... streams) {
+    // Added if to avoid NullPointerException in case one stream is being passed as null
+    if (null != streams) {
+      for (Closeable stream : streams) {
+        if (null != stream) {
+          try {
+            stream.close();
+          } catch (IOException e) {
+            LOG.error("Error while closing stream" + stream);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * updates table status details using latest metadata
+   *
+   * @param oldMetadata
+   * @param newMetadata
+   * @return
+   */
+
+  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
+      LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
+
+    List<LoadMetadataDetails> newListMetadata =
+        new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
+    for (LoadMetadataDetails oldSegment : oldMetadata) {
+      if (CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus())) {
+        updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
+      }
+    }
+    return newListMetadata;
+  }
+
+  /**
+   * updates segment status and modificaton time details
+   *
+   * @param loadMetadata
+   */
+  public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
+    // update status only if the segment is not marked for delete
+    if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+      loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+      loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
+    }
+  }
+
+  /**
+   * This API will return the update status file name.
+   * @param segmentList
+   * @return
+   */
+  public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
+    if(segmentList.length == 0) {
+      return "";
+    }
+    else {
+      for(LoadMetadataDetails eachSeg : segmentList) {
+        // file name stored in 0th segment.
+        if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
+          return eachSeg.getUpdateStatusFileName();
+        }
+      }
+    }
+    return "";
+  }
+
+  /**
+   * getting the task numbers present in the segment.
+   * @param segmentId
+   * @return
+   */
+  public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
+          updateStatusManager) {
+    List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
+    for (String eachFileName : list) {
+      taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
+    }
+    return taskList;
+  }
+
+
+  public static class ValidAndInvalidSegmentsInfo {
+    private final List<String> listOfValidSegments;
+    private final List<String> listOfValidUpdatedSegments;
+    private final List<String> listOfInvalidSegments;
+
+    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
+                                        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
+      this.listOfValidSegments = listOfValidSegments;
+      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
+      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
+    }
+    public List<String> getInvalidSegments() {
+      return listOfInvalidSegments;
+    }
+    public List<String> getValidSegments() {
+      return listOfValidSegments;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 9e6d8b0..f10d64f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -19,17 +19,17 @@
 
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Properties;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
 public final class CarbonProperties {
   /**
    * Attribute for Carbon LOGGER.
@@ -546,5 +546,56 @@ public final class CarbonProperties {
         String.valueOf(executorStartUpTimeOut));
     LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut);
   }
+  public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
+    int numberOfDeltaFilesThreshold;
+    try {
+      numberOfDeltaFilesThreshold = Integer.parseInt(
+          getProperty(CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
+              CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
+
+      if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
+        LOGGER.error("The specified value for property "
+            + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+            + "is incorrect."
+            + " Correct value should be in range of 0 -10000. Taking the default value.");
+        numberOfDeltaFilesThreshold = Integer.parseInt(
+            CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("The specified value for property "
+          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+          + "is incorrect."
+          + " Correct value should be in range of 0 -10000. Taking the default value.");
+      numberOfDeltaFilesThreshold = Integer
+          .parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
+    }
+    return numberOfDeltaFilesThreshold;
+  }
+
+  public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
+    int numberOfDeltaFilesThreshold;
+    try {
+      numberOfDeltaFilesThreshold = Integer.parseInt(
+          getProperty(CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
+              CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
+
+      if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
+        LOGGER.error("The specified value for property "
+            + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+            + "is incorrect."
+            + " Correct value should be in range of 0 -10000. Taking the default value.");
+        numberOfDeltaFilesThreshold = Integer.parseInt(
+            CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.error("The specified value for property "
+          + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
+          + "is incorrect."
+          + " Correct value should be in range of 0 -10000. Taking the default value.");
+      numberOfDeltaFilesThreshold = Integer
+          .parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
+    }
+    return numberOfDeltaFilesThreshold;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/fileoperations/FileWriteOperation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/fileoperations/FileWriteOperation.java b/core/src/main/java/org/apache/carbondata/fileoperations/FileWriteOperation.java
new file mode 100644
index 0000000..81502f2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/fileoperations/FileWriteOperation.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.fileoperations;
+
+public enum FileWriteOperation {
+
+  APPEND, OVERWRITE
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/locks/CarbonLockFactory.java
new file mode 100644
index 0000000..43e743a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/CarbonLockFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * This class is a Lock factory class which is used to provide lock objects.
+ * Using this lock object client can request the lock and unlock.
+ */
+public class CarbonLockFactory {
+
+  /**
+   * lockTypeConfigured to check if zookeeper feature is enabled or not for carbon.
+   */
+  private static String lockTypeConfigured;
+
+  static {
+    CarbonLockFactory.getLockTypeConfigured();
+  }
+
+  /**
+   * This method will determine the lock type.
+   *
+   * @param tableIdentifier
+   * @param lockFile
+   * @return
+   */
+  public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier,
+      String lockFile) {
+    switch (lockTypeConfigured.toUpperCase()) {
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+        return new LocalFileLock(tableIdentifier, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+        return new ZooKeeperLocking(tableIdentifier, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+        return new HdfsFileLock(tableIdentifier, lockFile);
+
+      default:
+        throw new UnsupportedOperationException("Not supported the lock type");
+    }
+  }
+
+  /**
+   *
+   * @param locFileLocation
+   * @param lockFile
+   * @return carbon lock
+   */
+  public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
+    switch (lockTypeConfigured.toUpperCase()) {
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+        return new LocalFileLock(locFileLocation, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+        return new ZooKeeperLocking(locFileLocation, lockFile);
+
+      case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+        return new HdfsFileLock(locFileLocation, lockFile);
+
+      default:
+        throw new UnsupportedOperationException("Not supported the lock type");
+    }
+  }
+
+  /**
+   * This method will set the zookeeper status whether zookeeper to be used for locking or not.
+   */
+  private static void getLockTypeConfigured() {
+    lockTypeConfigured = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.LOCK_TYPE_DEFAULT);
+  }
+
+}


Mime
View raw message