carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [10/22] incubator-carbondata git commit: IUD delete flow support
Date Fri, 06 Jan 2017 13:57:10 GMT
IUD delete flow support


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3e045d83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3e045d83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3e045d83

Branch: refs/heads/master
Commit: 3e045d8395c094882a5880a57dd617c55130a18e
Parents: 0d42f52
Author: ManoharVanam <manohar.crazy09@gmail.com>
Authored: Mon Jan 2 11:23:34 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 6 19:16:29 2017 +0530

----------------------------------------------------------------------
 .../core/carbon/path/CarbonTablePath.java       | 189 +++-
 .../core/constants/CarbonCommonConstants.java   | 107 +-
 .../reader/CarbonDeleteDeltaFileReader.java     |  43 +
 .../reader/CarbonDeleteDeltaFileReaderImpl.java | 143 +++
 .../reader/CarbonDeleteFilesDataReader.java     | 160 +++
 .../core/update/DeleteDeltaBlockDetails.java    |  95 ++
 .../core/update/DeleteDeltaBlockletDetails.java |  85 ++
 .../core/update/SegmentUpdateDetails.java       | 172 ++++
 .../SegmentUpdateStatusManager.java             | 968 +++++++++++++++++++
 .../core/writer/CarbonDeleteDeltaWriter.java    |  38 +
 .../writer/CarbonDeleteDeltaWriterImpl.java     | 120 +++
 .../fileoperations/AtomicFileOperations.java    |  33 +
 .../AtomicFileOperationsImpl.java               |  87 ++
 .../carbondata/locks/AbstractCarbonLock.java    |  77 ++
 .../apache/carbondata/locks/CarbonLockUtil.java |  64 ++
 .../apache/carbondata/locks/ICarbonLock.java    |  40 +
 .../org/apache/carbondata/locks/LockUsage.java  |  36 +
 .../carbondata/locks/ZooKeeperLocking.java      | 195 ++++
 .../carbondata/spark/util/LoadMetadataUtil.java |   5 +-
 .../execution/command/carbonTableSchema.scala   |  18 +-
 .../execution/command/carbonTableSchema.scala   | 115 ++-
 .../dataretention/DataRetentionTestCase.scala   |   6 +-
 22 files changed, 2779 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index e2d49c9..43c274a 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -18,17 +18,16 @@
  */
 package org.apache.carbondata.core.carbon.path;
 
-import java.io.File;
-
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.hadoop.fs.Path;
 
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.INVALID_SEGMENT_ID;
+import java.io.File;
 
-import org.apache.hadoop.fs.Path;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.INVALID_SEGMENT_ID;
 
 /**
  * Helps to get Table content paths.
@@ -41,12 +40,16 @@ public class CarbonTablePath extends Path {
   protected static final String SORT_INDEX_EXT = ".sortindex";
   protected static final String SCHEMA_FILE = "schema";
   protected static final String TABLE_STATUS_FILE = "tablestatus";
+  protected static final String TABLE_UPDATE_STATUS_FILE = "tableupdatestatus";
   protected static final String FACT_DIR = "Fact";
   protected static final String SEGMENT_PREFIX = "Segment_";
   protected static final String PARTITION_PREFIX = "Part";
   protected static final String CARBON_DATA_EXT = ".carbondata";
-  protected static final String DATA_PART_PREFIX = "part";
+  protected static final String CARBON_DELTE_DELTA_EXT = ".deletedelta";
+  protected static final String CARBON_UPDATE_DELTA_EXT = ".updatedelta";
+  protected static final String DATA_PART_PREFIX = "part-";
   protected static final String INDEX_FILE_EXT = ".carbonindex";
+  protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
   protected String tablePath;
   protected CarbonTableIdentifier carbonTableIdentifier;
@@ -108,6 +111,20 @@ public class CarbonTablePath extends Path {
     }
     return false;
   }
+  /**
+   * check if it is carbon data file matching extension
+   *
+   * @param fileNameWithPath
+   * @return boolean
+   */
+  public static boolean isCarbonDataFileOrUpdateFile(String fileNameWithPath) {
+    int pos = fileNameWithPath.lastIndexOf('.');
+    if (pos != -1) {
+      return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT) || fileNameWithPath
+          .substring(pos).startsWith(CARBON_UPDATE_DELTA_EXT);
+    }
+    return false;
+  }
 
   /**
    * check if it is carbon index file matching extension
@@ -205,6 +222,13 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * @return absolute path of table update status file
+   */
+  public String getTableUpdateStatusFilePath() {
+    return getMetaDataDir() + File.separator + TABLE_UPDATE_STATUS_FILE;
+  }
+
+  /**
    * Gets absolute path of data file
    *
    * @param partitionId         unique partition identifier
@@ -240,7 +264,84 @@ public class CarbonTablePath extends Path {
             .endsWith(INDEX_FILE_EXT);
       }
     });
-    return files[0].getAbsolutePath();
+    if (files.length > 0) {
+      return files[0].getAbsolutePath();
+    } else {
+      throw new RuntimeException("Missing Carbon index file for partition["
+          + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
+          + "]");
+    }
+  }
+  /**
+   * Below method will be used to get the index file present in the segment folder
+   * based on task id
+   *
+   * @param taskId      task id of the file
+   * @param partitionId partition number
+   * @param segmentId   segment number
+   * @return full qualified carbon index path
+   */
+  public String getCarbonUpdatedIndexFilePath(final String taskId, final String partitionId,
+      final String segmentId) {
+    String segmentDir = getSegmentDir(partitionId, segmentId);
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
+
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
+      }
+    });
+    if (files.length > 0) {
+      return files[0].getAbsolutePath();
+    } else {
+      throw new RuntimeException(
+          "Missing Carbon Updated index file for partition[" + partitionId
+              + "] Segment[" + segmentId + "], taskId[" + taskId + "]");
+    }
+  }
+
+  /**
+   * Below method will be used to get the index file present in the segment folder
+   * based on task id
+   *
+   * @param taskId      task id of the file
+   * @param partitionId partition number
+   * @param segmentId   segment number
+   * @return full qualified carbon index path
+   */
+  public String getCarbonDeleteDeltaFilePath(final String taskId, final String partitionId,
+      final String segmentId) {
+    String segmentDir = getSegmentDir(partitionId, segmentId);
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
+
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().startsWith(taskId) && file.getName().endsWith(DELETE_DELTA_FILE_EXT);
+      }
+    });
+    if (files.length > 0) {
+      return files[0].getAbsolutePath();
+    } else {
+      throw new RuntimeException(
+          "Missing Carbon delete delta file index file for partition["
+              + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
+              + "]");
+    }
+  }
+
+  public CarbonFile[] getCarbonDeleteDeltaFile(final String segmentId, final String partitionId) {
+    String segmentDir = getSegmentDir(partitionId, segmentId);
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
+
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(DELETE_DELTA_FILE_EXT);
+      }
+    });
+    return files;
   }
 
   /**
@@ -279,6 +380,18 @@ public class CarbonTablePath extends Path {
     return taskNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
   }
 
+  /**
+   * Below method will be used to get the carbon index filename
+   *
+   * @param taskNo               task number
+   * @param factUpdatedTimeStamp time stamp
+   * @return filename
+   */
+  public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp,
+      String indexFileExtension) {
+    return taskNo + "-" + factUpdatedTimeStamp + indexFileExtension;
+  }
+
   private String getSegmentDir(String partitionId, String segmentId) {
     return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
   }
@@ -327,6 +440,37 @@ public class CarbonTablePath extends Path {
     }
 
     /**
+     * gets updated timestamp information from given carbon index file name
+     */
+    public static String getUpdateTimeStampFromIndexFile(String carbonIndexFileName) {
+      // Get the file name from path
+      String fileName = getFileName(carbonIndexFileName);
+      int startIndex = fileName.indexOf("-") + 1;
+      int endIndex = fileName.indexOf(".");
+      return fileName.substring(startIndex, endIndex);
+    }
+
+    /**
+     * This will return the timestamp present in the delete delta file.
+     * @param fileName
+     * @return
+     */
+    public static String getTimeStampFromDeleteDeltaFile(String fileName) {
+      return fileName.substring(fileName.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+          fileName.lastIndexOf("."));
+    }
+
+    /**
+     * This will return the timestamp present in the delete delta file.
+     * @param fileName
+     * @return
+     */
+    public static String getBlockNameFromDeleteDeltaFile(String fileName) {
+      return fileName.substring(0,
+          fileName.lastIndexOf(CarbonCommonConstants.HYPHEN));
+    }
+
+    /**
      * gets file part number information from given carbon data file name
      */
     public static String getPartNo(String carbonDataFileName) {
@@ -380,6 +524,21 @@ public class CarbonTablePath extends Path {
         return carbonDataFileName;
       }
     }
+
+    /**
+     * Gets the file name of the delta files.
+     *
+     * @param filePartNo
+     * @param taskNo
+     * @param factUpdateTimeStamp
+     * @param Extension
+     * @return
+     */
+    public static String getCarbonDeltaFileName(String filePartNo, String taskNo,
+        String factUpdateTimeStamp, String Extension) {
+      return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + factUpdateTimeStamp
+          + Extension;
+    }
   }
 
   /**
@@ -440,7 +599,23 @@ public class CarbonTablePath extends Path {
             carbonDataFilePath.indexOf(CARBON_DATA_EXT));
     return carbonDataFileName;
   }
-}
+
+  /**
+   *
+   * @return carbon data extension
+   */
+  public static String getCarbonDataExtension() {
+    return CARBON_DATA_EXT;
+  }
+
+  /**
+   *
+   * @return carbon index extension
+   */
+  public static String getCarbonIndexExtension() {
+    return INDEX_FILE_EXT;
+  }
+
   /**
    * This method will remove strings in path and return short block id
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1088e5c..293237f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -199,6 +199,16 @@ public final class CarbonCommonConstants {
    * FACT_FILE_EXT
    */
   public static final String FACT_FILE_EXT = ".carbondata";
+
+  /**
+   * DELETE_DELTA_FILE_EXT
+   */
+  public static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
+
+  /**
+   * UPDATE_DELTA_FILE_EXT
+   */
+  public static final String UPDATE_DELTA_FILE_EXT = FACT_FILE_EXT;
   /**
    * MEASUREMETADATA_FILE_EXT
    */
@@ -437,6 +447,10 @@ public final class CarbonCommonConstants {
    */
   public static final String STORE_LOADSTATUS_SUCCESS = "Success";
   /**
+   * LOAD_STATUS UPDATE
+   */
+  public static final String STORE_LOADSTATUS_UPDATE = "Update";
+  /**
    * LOAD_STATUS FAILURE
    */
   public static final String STORE_LOADSTATUS_FAILURE = "Failure";
@@ -449,6 +463,10 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_METADATA_EXTENSION = ".metadata";
   /**
+   * LOAD_STATUS
+   */
+  public static final String CARBON_DEFAULT_STREAM_ENCODEFORMAT = "UTF-8";
+  /**
    * COMMA
    */
   public static final String COMMA = ",";
@@ -476,6 +494,11 @@ public final class CarbonCommonConstants {
    * CARBON_TIMESTAMP
    */
   public static final String CARBON_TIMESTAMP = "dd-MM-yyyy HH:mm:ss";
+
+  /**
+   * CARBON_TIMESTAMP
+   */
+  public static final String CARBON_TIMESTAMP_MILLIS = "dd-MM-yyyy HH:mm:ss:SSS";
   /**
    * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
    */
@@ -533,6 +556,7 @@ public final class CarbonCommonConstants {
   /**
    * FACT_UPDATE_EXTENSION.
    */
+  public static final String FACT_UPDATE_EXTENSION = ".carbondata_update";
   public static final String FACT_DELETE_EXTENSION = "_delete";
   /**
    * MARKED_FOR_UPDATION
@@ -547,6 +571,10 @@ public final class CarbonCommonConstants {
    */
   public static final String LOADMETADATA_FILENAME = "tablestatus";
   /**
+   * TABLE UPDATE STATUS FILENAME
+   */
+  public static final String TABLEUPDATESTATUS_FILENAME = "tableupdatestatus";
+  /**
    * INMEMORY_REOCRD_SIZE
    */
   public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
@@ -784,9 +812,9 @@ public final class CarbonCommonConstants {
   public static final long CARBON_256MB = 256*1024*1024;
 
   /**
-   * SEGMENT_COMPACTED is property to indicate whether seg is compacted or not.
+   * COMPACTED is property to indicate whether seg is compacted or not.
    */
-  public static final String SEGMENT_COMPACTED = "Compacted";
+  public static final String COMPACTED = "Compacted";
 
   /**
    * ZOOKEEPERLOCK TYPE
@@ -824,6 +852,30 @@ public final class CarbonCommonConstants {
   public static final String DEFAULT_SEGMENT_LEVEL_THRESHOLD = "4,3";
 
   /**
+   * Number of Update Delta files which is the Threshold for IUD compaction.
+   * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
+   */
+  public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
+          "carbon.horizontal.update.compaction.threshold";
+  /**
+   * Default count of segments which act as a threshold for IUD compaction merge.
+   */
+  public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
+
+
+  /**
+   * Number of Delete Delta files which is the Threshold for IUD compaction.
+   * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
+   */
+  public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION  =
+      "carbon.horizontal.delete.compaction.threshold";
+  /**
+   * Default count of segments which act as a threshold for IUD compaction merge.
+   */
+  public static final String DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
+
+
+  /**
    * default location of the carbon metastore db
    */
   public static final String METASTORE_LOCATION_DEFAULT_VAL = "../carbon.metastore";
@@ -894,6 +946,57 @@ public final class CarbonCommonConstants {
    * carbon data file version property
    */
   public static final String CARBON_DATA_FILE_VERSION = "carbon.data.file.version";
+
+ /**
+   * property to set is IS_DRIVER_INSTANCE
+   */
+  public static final String UPDATE_INDEX_FILE_EXT = ".carbonindex";
+
+  /**
+   * Key word for true
+   */
+  public static final String KEYWORD_TRUE = "TRUE";
+
+  /**
+   * Key word for false
+   */
+  public static final String KEYWORD_FALSE = "FALSE";
+
+  /**
+   * hyphen
+   */
+  public static final String HYPHEN = "-";
+
+  /**
+   * columns which gets updated in update will have header ends with this extension.
+   */
+  public static String UPDATED_COL_EXTENSION = "-updatedColumn";
+
+  /**
+   * appending the key to differentiate the update flow with insert flow.
+   */
+  public static String RDDUTIL_UPDATE_KEY = "UPDATE_";
+
+  /**
+   * to determine to use the rdd persist or not.
+   */
+  public static String isPersistEnabled = "carbon.update.persist.enable";
+
+  /**
+   * for enabling or disabling Horizontal Compaction.
+   */
+  public static String isHorizontalCompactionEnabled = "carbon.horizontal.compaction.enable";
+
+  /**
+   * Default value for HorizontalCompaction is true.
+   */
+  public static String defaultIsHorizontalCompactionEnabled = "true";
+
+  /**
+   * by default rdd will be persisted in the update case.
+   */
+  public static String defaultValueIsPersistEnabled = "true";
+
   /**
    * current data file version
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
new file mode 100644
index 0000000..ea0323a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+
+import java.io.IOException;
+
+/**
+ * CarbonDeleteDeltaFileReader contains all methods to read delete delta file data
+ */
+public interface CarbonDeleteDeltaFileReader {
+
+  /**
+   * This method will be used to read complete delete delta file.
+   * scenario:
+   * Whenever a query is executed then read the delete delta file
+   * and exclude the deleted data.
+   *
+   * @return All deleted records for specified block
+   * @throws IOException if an I/O error occurs
+   */
+  String read() throws IOException;
+  DeleteDeltaBlockDetails readJson() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
new file mode 100644
index 0000000..fdc5909
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import com.google.gson.Gson;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+
+import java.io.*;
+
+/**
+ * This class perform the functionality of reading the delete delta file
+ */
+public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileReader {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDeleteDeltaFileReaderImpl.class.getName());
+
+  private String filePath;
+
+  private FileFactory.FileType fileType;
+
+  DataInputStream dataInputStream = null;
+
+  BufferedReader buffReader = null;
+
+  InputStreamReader inputStream = null;
+
+  private static final int DEFAULT_BUFFER_SIZE = 258;
+
+  /**
+   * Constructor
+   *
+   * @param filePath
+   * @param fileType
+   */
+  public CarbonDeleteDeltaFileReaderImpl(String filePath, FileFactory.FileType fileType) {
+    this.filePath = filePath;
+
+    this.fileType = fileType;
+  }
+
+  /**
+   * This method will be used to read complete delete delta file.
+   * scenario:
+   * Whenever a query is executed then read the delete delta file
+   * to exclude the deleted data.
+   *
+   * @return All deleted records for the specified block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public String read() throws IOException {
+    // Configure Buffer based on our requirement
+    char[] buffer = new char[DEFAULT_BUFFER_SIZE];
+    StringWriter sw = new StringWriter();
+    dataInputStream = FileFactory.getDataInputStream(filePath, fileType);
+    inputStream = new InputStreamReader(dataInputStream,
+        CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+    buffReader = new BufferedReader(inputStream);
+    int n = 0;
+    while (-1 != (n = inputStream.read(buffer))) {
+      sw.write(buffer, 0, n);
+    }
+    return sw.toString();
+  }
+
+  @Override public DeleteDeltaBlockDetails readJson() throws IOException {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    DeleteDeltaBlockDetails deleteDeltaBlockDetails;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
+
+    try {
+      if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {
+        return new DeleteDeltaBlockDetails("");
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      buffReader = new BufferedReader(inStream);
+      deleteDeltaBlockDetails =
+          gsonObjectToRead.fromJson(buffReader, DeleteDeltaBlockDetails.class);
+    } catch (IOException e) {
+      return new DeleteDeltaBlockDetails("");
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return deleteDeltaBlockDetails;
+  }
+
+  /**
+   * Returns all deleted records from specified delete delta file
+   *
+   * @return
+   * @throws IOException
+   */
+  public int[] getDeleteDeltaRows() throws IOException {
+    String[] stringRows = read().split(CarbonCommonConstants.COMMA);
+    int[] rows = new int[stringRows.length];
+    int rowsLength = stringRows.length;
+    for (int i = 0; i < rowsLength; i++) {
+      try {
+        rows[i] = Integer.parseInt(stringRows[i]);
+      } catch (NumberFormatException nfe) {
+        LOGGER.error("Invalid row : " + stringRows[i] + nfe.getLocalizedMessage());
+        throw new IOException("Invalid row : " + nfe.getLocalizedMessage());
+      }
+    }
+
+    return rows;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
new file mode 100644
index 0000000..6cda03d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteFilesDataReader.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.reader;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.update.DeleteDeltaBlockletDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.commons.lang.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.*;
+
+/**
+ * This class perform the functionality of reading multiple delete delta files
+ */
+public class CarbonDeleteFilesDataReader {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDeleteFilesDataReader.class.getName());
+
+  /**
+   * thread pool size to be used for reading delete delta files
+   */
+  protected int thread_pool_size;
+
+  /**
+   * Constructor
+   */
+  public CarbonDeleteFilesDataReader() {
+    initThreadPoolSize();
+  }
+
+  /**
+   * This method will initialize the thread pool size to be used for creating the
+   * max number of threads for a job
+   */
+  private void initThreadPoolSize() {
+    try {
+      thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+              CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+    } catch (NumberFormatException e) {
+      thread_pool_size = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+  }
+
+  /**
+   * Returns all deleted records from all specified delta files
+   *
+   * @param deltaFiles
+   * @return
+   * @throws Exception
+   */
+  public int[] getDeleteDataFromAllFiles(List<String> deltaFiles, String blockletId)
+      throws Exception {
+    List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>(deltaFiles.size());
+    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+    for (final String deltaFile : deltaFiles) {
+      taskSubmitList.add(executorService.submit(new Callable<DeleteDeltaBlockDetails>() {
+        @Override public DeleteDeltaBlockDetails call() throws IOException {
+          CarbonDeleteDeltaFileReaderImpl deltaFileReader =
+              new CarbonDeleteDeltaFileReaderImpl(deltaFile, FileFactory.getFileType(deltaFile));
+          return deltaFileReader.readJson();
+        }
+      }));
+    }
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(30, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
+    }
+
+    Set<Integer> result = new TreeSet<Integer>();
+    for (int i = 0; i < taskSubmitList.size(); i++) {
+      try {
+        List<DeleteDeltaBlockletDetails> blockletDetails =
+            taskSubmitList.get(i).get().getBlockletDetails();
+        result.addAll(
+            blockletDetails.get(blockletDetails.indexOf(new DeleteDeltaBlockletDetails(blockletId)))
+                .getDeletedRows());
+      } catch (Throwable e) {
+        LOGGER.error(e.getMessage());
+        throw new Exception(e.getMessage());
+      }
+    }
+    return ArrayUtils.toPrimitive(result.toArray(new Integer[result.size()]));
+
+  }
+
+  public DeleteDeltaBlockDetails getCompactedDeleteDeltaFileFromBlock(List<String> deltaFiles,
+                                                                      String blockName) throws Exception {
+    // get the data.
+    List<Future<DeleteDeltaBlockDetails>> taskSubmitList = new ArrayList<>(deltaFiles.size());
+    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+    for (final String deltaFile : deltaFiles) {
+      taskSubmitList.add(executorService.submit(new Callable<DeleteDeltaBlockDetails>() {
+        @Override public DeleteDeltaBlockDetails call() throws IOException {
+          CarbonDeleteDeltaFileReaderImpl deltaFileReader =
+              new CarbonDeleteDeltaFileReaderImpl(deltaFile, FileFactory.getFileType(deltaFile));
+          return deltaFileReader.readJson();
+        }
+      }));
+    }
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(30, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error while reading the delete delta files : " + e.getMessage());
+    }
+
+    // Get a new DeleteDeltaBlockDetails as result set where all the data will me merged
+    // based on each Blocklet.
+    DeleteDeltaBlockDetails deleteDeltaResultSet = new DeleteDeltaBlockDetails(blockName);
+
+    for (int i = 0; i < taskSubmitList.size(); i++) {
+      try {
+        List<DeleteDeltaBlockletDetails> blockletDetails =
+            taskSubmitList.get(i).get().getBlockletDetails();
+        for(DeleteDeltaBlockletDetails blocklet : blockletDetails ) {
+          deleteDeltaResultSet.addBlockletDetails(blocklet);
+        }
+      } catch (Throwable e) {
+        LOGGER.error(e.getMessage());
+        throw new Exception(e.getMessage());
+      }
+    }
+    return deleteDeltaResultSet;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
new file mode 100644
index 0000000..b7a7107
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.update;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class stores the block details of delete delta file
+ */
+public class DeleteDeltaBlockDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918495724L;
+
+  private List<DeleteDeltaBlockletDetails> blockletDetails;
+  private String blockName;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteDeltaBlockDetails.class.getName());
+
+  public DeleteDeltaBlockDetails(String blockName) {
+    this.blockName = blockName;
+    blockletDetails = new ArrayList<DeleteDeltaBlockletDetails>();
+  }
+
+  public String getBlockName() {
+    return blockName;
+  }
+
+  public void setBlockName(String blockName) {
+    this.blockName = blockName;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (obj == null || !(obj instanceof DeleteDeltaBlockDetails)) return false;
+
+    DeleteDeltaBlockDetails that = (DeleteDeltaBlockDetails) obj;
+
+    return blockName.equals(that.blockName);
+
+  }
+
+  @Override public int hashCode() {
+    return blockName.hashCode();
+  }
+
+  public List<DeleteDeltaBlockletDetails> getBlockletDetails() {
+    return blockletDetails;
+  }
+
+  public boolean addBlockletDetails(DeleteDeltaBlockletDetails blocklet) {
+    int index = blockletDetails.indexOf(blocklet);
+    if (blockletDetails.isEmpty() || index == -1) {
+      return blockletDetails.add(blocklet);
+    } else {
+      return blockletDetails.get(index).addDeletedRows(blocklet.getDeletedRows());
+    }
+  }
+
+  public boolean addBlocklet(String blockletId, String offset) throws Exception {
+    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId);
+    try {
+      blocklet.addDeletedRow(CarbonUpdateUtil.getIntegerValue(offset));
+      return addBlockletDetails(blocklet);
+    } catch (Exception e) {
+      throw e;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
new file mode 100644
index 0000000..f2ee86c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.update;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class stores the block details of delete delta file
+ */
+public class DeleteDeltaBlockletDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914911491724L;
+  private String id;
+  private Set<Integer> deletedRows;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
+
+  public DeleteDeltaBlockletDetails(String id) {
+    this.id = id;
+    deletedRows = new TreeSet<Integer>();
+  }
+
+  public boolean addDeletedRows(Set<Integer> rows) {
+    return deletedRows.addAll(rows);
+  }
+
+  public boolean addDeletedRow(Integer row) {
+    return deletedRows.add(row);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public Set<Integer> getDeletedRows() {
+    return deletedRows;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || !(obj instanceof DeleteDeltaBlockletDetails)) {
+      return false;
+    }
+
+    DeleteDeltaBlockletDetails that = (DeleteDeltaBlockletDetails) obj;
+    return id.equals(that.id);
+  }
+
+  @Override public int hashCode() {
+    return id.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java b/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
new file mode 100644
index 0000000..e485ff2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.update;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class stores the segment details of table update status file
+ */
+public class SegmentUpdateDetails implements Serializable {
+
+  private static final long serialVersionUID = 1206104914918491724L;
+  private String segmentName;
+  private String blockName;
+  private String status = "";
+  private String deleteDeltaEndTimestamp = "";
+  private String deleteDeltaStartTimestamp = "";
+  private String actualBlockName;
+  private String deletedRowsInBlock = "0";
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SegmentUpdateDetails.class.getName());
+
+  public String getDeleteDeltaEndTimestamp() {
+    return deleteDeltaEndTimestamp;
+  }
+
+  public void setDeleteDeltaEndTimestamp(String deleteDeltaEndTimestamp) {
+    this.deleteDeltaEndTimestamp = deleteDeltaEndTimestamp;
+  }
+
+  public String getSegmentName() {
+    return segmentName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    this.segmentName = segmentName;
+  }
+
+  public String getBlockName() {
+    return blockName;
+  }
+
+  public void setBlockName(String blockName) {
+    this.blockName = blockName;
+  }
+
+  public String getDeleteDeltaStartTimestamp() {
+    return deleteDeltaStartTimestamp;
+  }
+
+  public void setDeleteDeltaStartTimestamp(String deleteDeltaStartTimestamp) {
+    this.deleteDeltaStartTimestamp = deleteDeltaStartTimestamp;
+  }
+
+  public void setStatus(String status) { this.status = status;}
+
+  public String getStatus() {return this.status;}
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = segmentName.hashCode();
+    result = prime * result + blockName.hashCode();
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+
+    }
+    if (!(obj instanceof SegmentUpdateDetails)) {
+      return false;
+    }
+    SegmentUpdateDetails other = (SegmentUpdateDetails) obj;
+    if (segmentName == null) {
+      if (other.segmentName != null) {
+        return false;
+      }
+    } else if (!segmentName.equals(other.segmentName)) {
+      return false;
+    }
+    if (blockName == null) {
+      if (other.blockName != null) {
+        return false;
+      }
+    } else if (!blockName.equals(other.blockName)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * return deleteDeltaTime as long
+   *
+   * @return
+   */
+  public long getDeleteDeltaEndTimeAsLong() {
+    return getTimeStampAsLong(deleteDeltaEndTimestamp);
+  }
+
+  /**
+   * return deleteDeltaTime as long
+   *
+   * @return
+   */
+  public long getDeleteDeltaStartTimeAsLong() {
+
+    return getTimeStampAsLong(deleteDeltaStartTimestamp);
+  }
+
+  /**
+   * returns complete block name
+   *
+   * @return
+   */
+  public String getActualBlockName() {
+    return actualBlockName;
+  }
+
+  public void setActualBlockName(String actualBlockName) {
+    this.actualBlockName = actualBlockName;
+  }
+
+  /**
+   * returns timestamp as long value
+   *
+   * @param timtstamp
+   * @return
+   */
+  private Long getTimeStampAsLong(String timtstamp) {
+    long longValue = 0;
+    try {
+      longValue = Long.parseLong(timtstamp);
+    } catch (NumberFormatException nfe) {
+      String errorMsg = "Invalid timestamp : " + timtstamp;
+      LOGGER.debug(errorMsg);
+    }
+    return longValue;
+  }
+
+  public String getDeletedRowsInBlock() {
+    return deletedRowsInBlock;
+  }
+
+  public void setDeletedRowsInBlock(String deletedRowsInBlock) {
+    this.deletedRowsInBlock = deletedRowsInBlock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
new file mode 100644
index 0000000..0104eab
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentUpdateStatusManager.java
@@ -0,0 +1,968 @@
+package org.apache.carbondata.core.updatestatus;
+
+import com.google.gson.Gson;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+import org.apache.carbondata.core.update.TupleIdEnum;
+import org.apache.carbondata.core.update.UpdateVO;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.fileoperations.FileWriteOperation;
+import org.apache.carbondata.locks.CarbonLockFactory;
+import org.apache.carbondata.locks.ICarbonLock;
+import org.apache.carbondata.locks.LockUsage;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Manages Segment & block status of carbon table
+ */
+public class SegmentUpdateStatusManager {
+
+  /**
+   * logger
+   */
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(SegmentUpdateStatusManager.class.getName());
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+  private LoadMetadataDetails[] segmentDetails;
+  private SegmentUpdateDetails[] updateDetails;
+  private CarbonTablePath carbonTablePath;
+  private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+
+  /**
+   * Constructor
+   *
+   * @param absoluteTableIdentifier
+   */
+  public SegmentUpdateStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+        absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    // current it is used only for read function scenarios, as file update always requires to work
+    // on latest file status.
+    segmentDetails =
+        segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    updateDetails = readLoadMetadata();
+    populateMap();
+  }
+
+  /**
+   * populate the block and its details in a map.
+   */
+  private void populateMap() {
+    blockAndDetailsMap = new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for(SegmentUpdateDetails blockDetails : updateDetails) {
+
+      String blockIdentifier = CarbonUpdateUtil
+          .getSegmentBlockNameKey(blockDetails.getSegmentName(), blockDetails.getActualBlockName());
+
+      blockAndDetailsMap.put(blockIdentifier, blockDetails);
+
+    }
+
+  }
+
+  /**
+   *
+   * @param segID
+   * @param actualBlockName
+   * @return null if block is not present in segment update status.
+   */
+  public SegmentUpdateDetails getDetailsForABlock(String segID, String actualBlockName) {
+
+    String blockIdentifier = CarbonUpdateUtil
+        .getSegmentBlockNameKey(segID, actualBlockName);
+
+    return blockAndDetailsMap.get(blockIdentifier);
+
+  }
+
+  /**
+   *
+   * @param key will be like (segid/blockname)  0/0-0-5464654654654
+   * @return
+   */
+  public SegmentUpdateDetails getDetailsForABlock(String key) {
+
+    return blockAndDetailsMap.get(key);
+
+  }
+
+
+
+  /**
+   * Returns the LoadMetadata Details
+   * @return
+   */
+  public LoadMetadataDetails[] getLoadMetadataDetails() {
+    return segmentDetails;
+  }
+
+  /**
+   *
+   * @param loadMetadataDetails
+   */
+  public void setLoadMetadataDetails(LoadMetadataDetails[] loadMetadataDetails) {
+    this.segmentDetails = loadMetadataDetails;
+  }
+
+  /**
+   * Returns the UpdateStatus Details.
+   * @return
+   */
+  public SegmentUpdateDetails[] getUpdateStatusDetails() {
+    return updateDetails;
+  }
+
+  /**
+   *
+   * @param segmentUpdateDetails
+   */
+  public void setUpdateStatusDetails(SegmentUpdateDetails[] segmentUpdateDetails) {
+    this.updateDetails = segmentUpdateDetails;
+  }
+
+  /**
+   * This will return the lock object used to lock the table update status file before updation.
+   *
+   * @return
+   */
+  public ICarbonLock getTableUpdateStatusLock() {
+    return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+        LockUsage.TABLE_UPDATE_STATUS_LOCK);
+  }
+
+  /**
+   * Returns all delete delta files of specified block
+   *
+   * @param tupleId
+   * @return
+   * @throws Exception
+   */
+  public List<String> getDeleteDeltaFiles(String tupleId) throws Exception {
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
+  }
+
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @return
+   * @throws Exception
+   */
+  public List<String> getUpdateDeltaFiles(final String segmentId) {
+    List<String> updatedDeltaFilesList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+      }
+    }
+    // if start timestamp is empty then no update delta is found. so return empty list.
+    if (startTimeStamp.isEmpty()) {
+      return updatedDeltaFilesList;
+    }
+    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+
+    // else scan the segment for the delta files with the respective timestamp.
+    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                  firstPart.length()));
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+
+            // if marked for delete then it is invalid.
+            if (!isBlockValid(segmentId, fileName)) {
+              return false;
+            }
+
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    for (CarbonFile cfile : files) {
+      updatedDeltaFilesList.add(cfile.getCanonicalPath());
+    }
+
+    return updatedDeltaFilesList;
+  }
+
+  /**
+   * Returns all deleted records of specified block
+   *
+   * @param tupleId
+   * @return
+   * @throws Exception
+   */
+  public int[] getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
+    List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
+    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
+    String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID);
+    return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
+  }
+
+
+
+  /**
+   * Returns all delta file paths of specified block
+   *
+   * @param tupleId
+   * @param extension
+   * @return
+   * @throws Exception
+   */
+  public List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+    try {
+      CarbonTablePath carbonTablePath = CarbonStorePath
+          .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+              absoluteTableIdentifier.getCarbonTableIdentifier());
+      String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
+      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+      String completeBlockName = CarbonTablePath.addDataPartPrefix(
+          CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
+              + CarbonCommonConstants.FACT_FILE_EXT);
+      String blockPath =
+          carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+      CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
+      if (!file.exists()) {
+        throw new Exception("Invalid tuple id " + tupleId);
+      }
+      String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
+      //blockName without timestamp
+      final String blockNameFromTuple =
+          blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
+      SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
+          readLoadMetadata();
+      return getDeltaFiles(file, blockNameFromTuple, listOfSegmentUpdateDetailsArray, extension,
+          segment);
+
+    } catch (Exception ex) {
+      String errorMsg = "Invalid tuple id " + tupleId;
+      LOG.error(errorMsg);
+      throw new Exception(errorMsg);
+    }
+  }
+
+  /**
+   * This method returns the list of Blocks associated with the segment
+   * from the SegmentUpdateDetails List.
+   * @param segmentName
+   * @return
+   */
+  public List<String> getBlockNameFromSegment(String segmentName) {
+    List<String> blockNames = new ArrayList<String>();
+    for (SegmentUpdateDetails block : updateDetails) {
+      if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil
+          .isBlockInvalid(block.getStatus())) {
+        blockNames.add(block.getBlockName());
+      }
+    }
+    return blockNames;
+  }
+
+  /**
+   *
+   * @param segName
+   * @param blockName
+   * @return
+   */
+  public boolean isBlockValid(String segName, String blockName) {
+
+    SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);
+
+    if (details == null || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+      return true;
+    }
+
+    return false;
+  }
+  /**
+   * Returns all delta file paths of specified block
+   *
+   * @param blockDir
+   * @param blockNameFromTuple
+   * @param listOfSegmentUpdateDetailsArray
+   * @param extension
+   * @return
+   */
+  public List<String> getDeltaFiles(CarbonFile blockDir, final String blockNameFromTuple,
+                                    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray, final String extension,
+                                    String segment) {
+    List<String> deleteFileList = null;
+    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+      if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
+          .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+        final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
+        // If there is no delete delete file , then return null
+        if(deltaStartTimestamp == 0) {
+          return deleteFileList;
+        }
+        final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block);
+
+        // final long deltaEndTimeStamp = block.getDeleteDeltaEndTimeAsLong();
+        // final long deltaStartTimestamp = block.getDeleteDeltaStartTimeAsLong();
+        return getFilePaths(blockDir, blockNameFromTuple, extension, deleteFileList,
+            deltaStartTimestamp, deltaEndTimeStamp);
+      }
+    }
+    return deleteFileList;
+  }
+
+  private List<String> getFilePaths(CarbonFile blockDir, final String blockNameFromTuple,
+                                    final String extension, List<String> deleteFileList, final long deltaStartTimestamp,
+                                    final long deltaEndTimeStamp) {
+    CarbonFile[] files = blockDir.getParentFile().listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(extension)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+          String blockName =
+              firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                  firstPart.length()));
+          if (blockNameFromTuple.equals(blockName) && ((
+              Long.compare(timestamp, deltaEndTimeStamp ) <= 0) && (
+              Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
+            return true;
+          }
+        }
+        return false;
+      }
+    });
+
+    for (CarbonFile cfile : files) {
+      if(null == deleteFileList) {
+        deleteFileList = new ArrayList<String>(files.length);
+      }
+      deleteFileList.add(cfile.getCanonicalPath());
+    }
+    return deleteFileList;
+  }
+
+  /**
+   * Return all delta file for a block.
+   * @param segmentId
+   * @param blockName
+   * @return
+   */
+  public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final String blockName) {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+
+    for (SegmentUpdateDetails block : updateDetails) {
+      if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
+          (block.getSegmentName().equalsIgnoreCase(segmentId))
+          && !CarbonUpdateUtil.isBlockInvalid((block.getStatus()))) {
+        final long deltaStartTimestamp =
+            getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+        final long deltaEndTimeStamp =
+            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+        CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile pathName) {
+            String fileName = pathName.getName();
+            if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+              String firstPart = fileName.substring(0, fileName.indexOf('.'));
+              String blkName = firstPart.substring(0, firstPart.lastIndexOf("-"));
+              long timestamp = Long.parseLong(
+                  firstPart.substring(firstPart.lastIndexOf("-") + 1, firstPart.length()));
+              if (blockName.equals(blkName) && (Long.compare(timestamp, deltaEndTimeStamp) <= 0)
+                  && (Long.compare(timestamp, deltaStartTimestamp) >= 0)) {
+                return true;
+              }
+            }
+            return false;
+          }
+        });
+
+        return files;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @param validUpdateFiles if true then only the valid range files will be returned.
+   * @return
+   */
+  public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles,
+                                              final String fileExtension, final boolean excludeOriginalFact,
+                                              CarbonFile[] allFilesOfSegment) {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    long factTimeStamp = 0;
+
+    LoadMetadataDetails[] segmentDetails =
+        segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+        factTimeStamp = eachSeg.getLoadStartTime();
+      }
+    }
+
+    // if start timestamp is empty then no update delta is found. so return empty list.
+    if (startTimeStamp.isEmpty()) {
+      return new CarbonFile[0];
+    }
+
+    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+    final long factTimeStampFinal = factTimeStamp;
+
+    List<CarbonFile> listOfCarbonFiles =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // else scan the segment for the delta files with the respective timestamp.
+
+    for (CarbonFile eachFile : allFilesOfSegment) {
+
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(fileExtension)) {
+        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+        long timestamp = Long.parseLong(firstPart
+            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                firstPart.length()));
+
+        if (excludeOriginalFact) {
+          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
+            continue;
+          }
+        }
+
+        if (validUpdateFiles) {
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        } else {
+          // invalid cases.
+          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        }
+      }
+    }
+
+    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
+  }
+
+  /**
+   * Returns all update delta files of specified Segment.
+   *
+   * @param segmentId
+   * @param validUpdateFiles
+   * @param fileExtension
+   * @param excludeOriginalFact
+   * @param allFilesOfSegment
+   * @return
+   */
+  public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId,
+                                                    final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
+                                                    CarbonFile[] allFilesOfSegment) {
+
+    String endTimeStamp = "";
+    String startTimeStamp = "";
+    long factTimeStamp = 0;
+
+    for (LoadMetadataDetails eachSeg : segmentDetails) {
+      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
+        // if the segment is found then take the start and end time stamp.
+        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
+        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
+        factTimeStamp = eachSeg.getLoadStartTime();
+      }
+    }
+
+    // if start timestamp is empty then no update delta is found. so return empty list.
+    if (startTimeStamp.isEmpty()) {
+      return new CarbonFile[0];
+    }
+
+    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
+    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
+    final long factTimeStampFinal = factTimeStamp;
+
+    List<CarbonFile> listOfCarbonFiles =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // else scan the segment for the delta files with the respective timestamp.
+
+    for (CarbonFile eachFile : allFilesOfSegment) {
+
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(fileExtension)) {
+        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+        long timestamp = Long.parseLong(firstPart
+            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                firstPart.length()));
+
+        if (excludeOriginalFact) {
+          if (Long.compare(factTimeStampFinal, timestamp) == 0) {
+            continue;
+          }
+        }
+
+        if (validUpdateFiles) {
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+
+            boolean validBlock = true;
+
+            for(SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
+              if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
+                  && CarbonUpdateUtil.isBlockInvalid(blockDetails.getStatus())) {
+                validBlock = false;
+              }
+            }
+
+            if (validBlock) {
+              listOfCarbonFiles.add(eachFile);
+            }
+
+          }
+        } else {
+          // invalid cases.
+          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+            listOfCarbonFiles.add(eachFile);
+          }
+        }
+      }
+    }
+
+    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
+  }
+
+  /**
+   *
+   * @param extension
+   * @param block
+   * @return
+   */
+  private long getStartTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
+    long startTimestamp;
+    switch (extension) {
+      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
+        startTimestamp = block.getDeleteDeltaStartTimeAsLong();
+        break;
+      default:
+        startTimestamp = 0;
+    }
+    return startTimestamp;
+  }
+
+  /**
+   *
+   * @param extension
+   * @param block
+   * @return
+   */
+  private long getEndTimeOfDeltaFile(String extension, SegmentUpdateDetails block) {
+    long endTimestamp;
+    switch (extension) {
+      case CarbonCommonConstants.DELETE_DELTA_FILE_EXT:
+        endTimestamp = block.getDeleteDeltaEndTimeAsLong();
+        break;
+      default: endTimestamp = 0;
+    }
+    return endTimestamp;
+  }
+
+
+  /**
+   * This method loads segment update details
+   *
+   * @return
+   */
+  public SegmentUpdateDetails[] readLoadMetadata() {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray;
+
+    // get the updated status file identifier from the table status.
+    String tableUpdateStatusIdentifier = getUpdatedStatusIdentifier();
+
+    if(null == tableUpdateStatusIdentifier) {
+      return new SegmentUpdateDetails[0];
+    }
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableUpdateStatusPath =
+        carbonTablePath.getMetadataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+            + tableUpdateStatusIdentifier;
+    AtomicFileOperations fileOperation = new AtomicFileOperationsImpl(tableUpdateStatusPath,
+        FileFactory.getFileType(tableUpdateStatusPath));
+
+    try {
+      if (!FileFactory
+          .isFileExist(tableUpdateStatusPath, FileFactory.getFileType(tableUpdateStatusPath))) {
+        return new SegmentUpdateDetails[0];
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+      buffReader = new BufferedReader(inStream);
+      listOfSegmentUpdateDetailsArray =
+          gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
+    } catch (IOException e) {
+      return new SegmentUpdateDetails[0];
+    } finally {
+      closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    return listOfSegmentUpdateDetailsArray;
+  }
+
+
+  /**
+   *
+   * @return
+   */
+  private String getUpdatedStatusIdentifier() {
+    SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    LoadMetadataDetails[] loadDetails =
+        ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    if (loadDetails.length == 0) {
+      return null;
+    }
+    return loadDetails[0].getUpdateStatusFileName();
+  }
+
+  /**
+   * writes segment update details into a given file at @param dataLoadLocation
+   *
+   * @param listOfSegmentUpdateDetailsArray
+   * @throws IOException
+   */
+  public void writeLoadDetailsIntoFile(List<SegmentUpdateDetails> listOfSegmentUpdateDetailsArray,
+      String updateStatusFileIdentifier) throws IOException {
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String fileLocation =
+        carbonTablePath.getMetadataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR
+            + CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier);
+
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(fileLocation, FileFactory.getFileType(fileLocation));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    // write the updated data into the metadata file.
+
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
+      brWriter.write(metadataInstance);
+    } catch (IOException ioe) {
+      LOG.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+
+  }
+
+  /**
+   * compares passed time stamp with status file delete timestamp and
+   * returns latest timestamp from status file if both are not equal
+   * returns null otherwise
+   *
+   * @param completeBlockName
+   * @param timestamp
+   * @return
+   */
+  public String getTimestampForRefreshCache(String completeBlockName, String timestamp) {
+    long cacheTimestamp = 0;
+    if (null != timestamp) {
+      cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp);
+    }
+    String blockName = CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName(
+        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.BLOCK_ID)));
+    String segmentId =
+        CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.SEGMENT_ID);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
+        readLoadMetadata();
+    for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
+      if (segmentId.equalsIgnoreCase(block.getSegmentName()) && block.getBlockName()
+          .equalsIgnoreCase(blockName) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+        long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong();
+        if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
+          return null;
+        } else {
+          return block.getDeleteDeltaEndTimestamp();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method closes the streams
+   *
+   * @param streams - streams to close.
+   */
+  private void closeStreams(Closeable... streams) {
+    // Added if to avoid NullPointerException in case one stream is being passed as null
+    if (null != streams) {
+      for (Closeable stream : streams) {
+        if (null != stream) {
+          try {
+            stream.close();
+          } catch (IOException e) {
+            LOG.error("Error while closing stream" + stream);
+          }
+        }
+      }
+    }
+  }
+  /**
+   * Get the invalid tasks in that segment.
+   * @param segmentId
+   * @return
+   */
+  public List<String> getInvalidBlockList(String segmentId) {
+
+    // get the original fact file timestamp from the table status file.
+    List<String> listOfInvalidBlocks = new ArrayList<String>();
+    SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    LoadMetadataDetails[] segmentDetails =
+        ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    long timestampOfOriginalFacts = 0;
+
+    String startTimestampOfUpdate = "" ;
+    String endTimestampOfUpdate = "";
+
+    for(LoadMetadataDetails segment : segmentDetails){
+      // find matching segment and return timestamp.
+      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
+        timestampOfOriginalFacts = segment.getLoadStartTime();
+        startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp();
+        endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp();
+      }
+    }
+
+    if (startTimestampOfUpdate.isEmpty()) {
+      return listOfInvalidBlocks;
+
+    }
+
+    // now after getting the original fact timestamp, what ever is remaining
+    // files need to cross check it with table status file.
+
+    // filter out the fact files.
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+
+    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate);
+    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate);
+    final Long timeStampOriginalFactFinal =
+        timestampOfOriginalFacts;
+
+    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile pathName) {
+        String fileName = pathName.getName();
+        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
+          String firstPart = fileName.substring(0, fileName.indexOf('.'));
+
+          long timestamp = Long.parseLong(firstPart
+              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                  firstPart.length()));
+          if (Long.compare(timestamp, endTimeStampFinal) <= 0
+              && Long.compare(timestamp, startTimeStampFinal) >= 0) {
+            return false;
+          }
+          if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) {
+            return false;
+          }
+          // take the rest of files as they are invalid.
+          return true;
+        }
+        return false;
+      }
+    });
+
+    // gather the task numbers.
+    for(CarbonFile updateFiles : files) {
+      listOfInvalidBlocks.add(updateFiles.getName());
+    }
+
+    return listOfInvalidBlocks;
+  }
+  /**
+   * Returns the invalid timestamp range of a segment.
+   * @param segmentId
+   * @return
+   */
+  public UpdateVO getInvalidTimestampRange(String segmentId) {
+    UpdateVO range = new UpdateVO();
+    for (LoadMetadataDetails segment : segmentDetails) {
+      if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
+        range.setFactTimestamp(segment.getLoadStartTime());
+        if (!segment.getUpdateDeltaStartTimestamp().isEmpty() && !segment
+            .getUpdateDeltaEndTimestamp().isEmpty()) {
+          range.setUpdateDeltaStartTimestamp(
+              CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
+          range.setLatestUpdateTimestamp(
+              CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
+        }
+      }
+    }
+    return range;
+  }
+  /**
+   *
+   * @param segmentId
+   * @param block
+   * @param needCompleteList
+   * @return
+   */
+  public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
+                                                     final SegmentUpdateDetails block, final boolean needCompleteList,
+                                                     CarbonFile[] allSegmentFiles) {
+
+    final long deltaStartTimestamp =
+        getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+
+    List<CarbonFile> files =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (CarbonFile eachFile : allSegmentFiles) {
+      String fileName = eachFile.getName();
+      if (fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+        String blkName = CarbonTablePath.DataFileUtil.getBlockNameFromDeleteDeltaFile(fileName);
+
+        // complete list of delta files of that block is returned.
+        if (needCompleteList && block.getBlockName().equalsIgnoreCase(blkName)) {
+          files.add(eachFile);
+        }
+
+        // invalid delete delta files only will be returned.
+        long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
+            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));
+
+        if (block.getBlockName().equalsIgnoreCase(blkName) && (
+            Long.compare(timestamp, deltaStartTimestamp) < 0)) {
+          files.add(eachFile);
+        }
+      }
+    }
+
+    return files.toArray(new CarbonFile[files.size()]);
+  }
+
+  /**
+   *
+   * @param blockName
+   * @param allSegmentFiles
+   * @return
+   */
+  public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] allSegmentFiles,
+                                              String actualBlockName) {
+    List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (CarbonFile eachFile : allSegmentFiles) {
+
+      // for carbon data.
+      if (eachFile.getName().equalsIgnoreCase(actualBlockName)) {
+        files.add(eachFile);
+      }
+
+      // get carbon index files of the block.
+      String taskNum = CarbonTablePath.DataFileUtil.getTaskNo(actualBlockName);
+      // String indexFileEndsWith = timestamp + CarbonTablePath.getCarbonIndexExtension();
+      if (eachFile.getName().endsWith(CarbonTablePath.getCarbonIndexExtension()) && eachFile
+          .getName().startsWith(taskNum)) {
+        files.add(eachFile);
+      }
+
+    }
+
+    return files.toArray(new CarbonFile[files.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriter.java
new file mode 100644
index 0000000..dae0ddb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.writer;
+
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+
+import java.io.IOException;
+
+/**
+ * Delete delta file writer interface
+ */
+public interface CarbonDeleteDeltaWriter {
+
+  /**
+   * write method that accepts comma seperated deleted records
+   *
+   * @param value delete record details
+   * @throws IOException if an I/O error occurs
+   */
+  void write(String value) throws IOException;
+  void write(DeleteDeltaBlockDetails deleteBlockDetails) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
new file mode 100644
index 0000000..42dc959
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.writer;
+
+import com.google.gson.Gson;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.update.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * This class is responsible for writing the delete delta file
+ */
+public class CarbonDeleteDeltaWriterImpl implements CarbonDeleteDeltaWriter {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDeleteDeltaWriterImpl.class.getName());
+
+  private String filePath;
+
+  private FileFactory.FileType fileType;
+
+  private DataOutputStream dataOutStream = null;
+
+  /**
+   * Constructor
+   *
+   * @param filePath
+   * @param fileType
+   */
+  public CarbonDeleteDeltaWriterImpl(String filePath, FileFactory.FileType fileType) {
+    this.filePath = filePath;
+    this.fileType = fileType;
+
+  }
+
+  /**
+   * This method will write the deleted records data in to disk.
+   *
+   * @param value deleted records
+   * @throws IOException if an I/O error occurs
+   */
+  @Override public void write(String value) throws IOException {
+    BufferedWriter brWriter = null;
+    try {
+      FileFactory.createNewFile(filePath, fileType);
+      dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      brWriter.write(value);
+    } catch (IOException ioe) {
+      LOGGER.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      if (null != dataOutStream) {
+        dataOutStream.flush();
+      }
+      CarbonUtil.closeStreams(brWriter, dataOutStream);
+    }
+
+  }
+
+  /**
+   * This method will write the deleted records data in the json format.
+   * @param deleteDeltaBlockDetails
+   * @throws IOException
+   */
+  @Override public void write(DeleteDeltaBlockDetails deleteDeltaBlockDetails) throws IOException {
+    BufferedWriter brWriter = null;
+    try {
+      FileFactory.createNewFile(filePath, fileType);
+      dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
+      Gson gsonObjectToWrite = new Gson();
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
+          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+      String deletedData = gsonObjectToWrite.toJson(deleteDeltaBlockDetails);
+      brWriter.write(deletedData);
+    } catch (IOException ioe) {
+      LOGGER.error("Error message: " + ioe.getLocalizedMessage());
+    } finally {
+      if (null != brWriter) {
+        brWriter.flush();
+      }
+      if (null != dataOutStream) {
+        dataOutStream.flush();
+      }
+      CarbonUtil.closeStreams(brWriter, dataOutStream);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3e045d83/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperations.java b/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperations.java
new file mode 100644
index 0000000..a1c7a2e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/fileoperations/AtomicFileOperations.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.fileoperations;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public interface AtomicFileOperations {
+
+  DataInputStream openForRead() throws IOException;
+
+  void close() throws IOException;
+
+  DataOutputStream openForWrite(FileWriteOperation operation) throws IOException;
+}



Mime
View raw message