carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [20/52] [partial] incubator-carbondata git commit: move core package
Date Mon, 16 Jan 2017 14:52:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
deleted file mode 100644
index 2c7abbf..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
+++ /dev/null
@@ -1,797 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.update;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO;
-import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
-import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.locks.ICarbonLock;
-
-
-/**
- * This class contains all update utility methods
- */
-public class CarbonUpdateUtil {
-
-  private static final LogService LOGGER =
-          LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
-
-  /**
-   * returns required filed from tuple id
-   *
-   * @param Tid
-   * @param tid
-   * @return
-   */
-  public static String getRequiredFieldFromTID(String Tid, TupleIdEnum tid) {
-    return Tid.split("/")[tid.getTupleIdIndex()];
-  }
-
-  /**
-   * returns segment along with block id
-   * @param Tid
-   * @return
-   */
-  public static String getSegmentWithBlockFromTID(String Tid) {
-    return getRequiredFieldFromTID(Tid, TupleIdEnum.SEGMENT_ID)
-        + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid, TupleIdEnum.BLOCK_ID);
-  }
-
-  /**
-   * Returns block path from tuple id
-   *
-   * @param tid
-   * @param factPath
-   * @return
-   */
-  public static String getTableBlockPath(String tid, String factPath) {
-    String part =
-            CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
-    String segment =
-            CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
-    return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
-            + CarbonCommonConstants.FILE_SEPARATOR + segment;
-
-  }
-
-  /**
-   * returns delete delta file path
-   *
-   * @param blockPath
-   * @param blockPath
-   * @param timestamp
-   * @return
-   */
-  public static String getDeleteDeltaFilePath(String blockPath, String blockName,
-                                              String timestamp) {
-    return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
-        + CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
-
-  }
-
-  /**
-   * @param updateDetailsList
-   * @param table
-   * @param updateStatusFileIdentifier
-   * @return
-   */
-  public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
-      CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
-    boolean status = false;
-    SegmentUpdateStatusManager segmentUpdateStatusManager =
-            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
-    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
-    boolean lockStatus = false;
-
-    try {
-      lockStatus = updateLock.lockWithRetries();
-      if (lockStatus) {
-
-        AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-        CarbonTablePath carbonTablePath = CarbonStorePath
-                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                        absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        // read the existing file if present and update the same.
-        SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
-                .getUpdateStatusDetails();
-
-        List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails));
-
-        for (SegmentUpdateDetails newBlockEntry : updateDetailsList) {
-          int index = oldList.indexOf(newBlockEntry);
-          if (index != -1) {
-            // update the element in existing list.
-            SegmentUpdateDetails blockDetail = oldList.get(index);
-            if(blockDetail.getDeleteDeltaStartTimestamp().isEmpty() ||
-                    (isCompaction == true)) {
-              blockDetail
-                      .setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
-            }
-            blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
-            blockDetail.setStatus(newBlockEntry.getStatus());
-            blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
-          } else {
-            // add the new details to the list.
-            oldList.add(newBlockEntry);
-          }
-        }
-
-        segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, updateStatusFileIdentifier);
-        status = true;
-      } else {
-        LOGGER.error("Not able to acquire the segment update lock.");
-        status = false;
-      }
-    } catch (IOException e) {
-      status = false;
-    } finally {
-      if (lockStatus) {
-        if (updateLock.unlock()) {
-          LOGGER.info("Unlock the segment update lock successfull.");
-        } else {
-          LOGGER.error("Not able to unlock the segment update lock.");
-        }
-      }
-    }
-    return status;
-  }
-
-  /**
-   *
-   * @param updatedSegmentsList
-   * @param table
-   * @param updatedTimeStamp
-   * @param isTimestampUpdationRequired
-   * @param segmentsToBeDeleted
-   * @return
-   */
-  public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
-                                                  CarbonTable table, String updatedTimeStamp,
-                                                  boolean isTimestampUpdationRequired,
-                                                  List<String> segmentsToBeDeleted) {
-
-    boolean status = false;
-
-    String metaDataFilepath = table.getMetaDataFilepath();
-
-    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-    boolean lockStatus = false;
-    try {
-      lockStatus = carbonLock.lockWithRetries();
-      if (lockStatus) {
-        LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
-                        + " for table status updation");
-
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-
-          if (isTimestampUpdationRequired) {
-            // we are storing the link between the 2 status files in the segment 0 only.
-            if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
-              loadMetadata.setUpdateStatusFileName(
-                      CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
-            }
-
-            // if the segments is in the list of marked for delete then update the status.
-            if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
-              loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
-              loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
-            }
-          }
-          for (String segName : updatedSegmentsList) {
-            if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
-              // if this call is coming from the delete delta flow then the time stamp
-              // String will come empty then no need to write into table status file.
-              if (isTimestampUpdationRequired) {
-                loadMetadata.setIsDeleted(CarbonCommonConstants.KEYWORD_TRUE);
-                // if in case of update flow.
-                if (loadMetadata.getUpdateDeltaStartTimestamp().isEmpty()) {
-                  // this means for first time it is getting updated .
-                  loadMetadata.setUpdateDeltaStartTimestamp(updatedTimeStamp);
-                }
-                // update end timestamp for each time.
-                loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
-              }
-            }
-          }
-        }
-
-        try {
-          segmentStatusManager
-                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
-        } catch (IOException e) {
-          return false;
-        }
-
-        status = true;
-      } else {
-        LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
-      }
-    } finally {
-      if (lockStatus) {
-        if (carbonLock.unlock()) {
-          LOGGER.info(
-                 "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
-        } else {
-          LOGGER.error(
-                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
-        }
-      }
-    }
-    return status;
-
-  }
-
-  /**
-   * gets the file name of the update status file. by appending the latest timestamp to it.
-   *
-   * @param updatedTimeStamp
-   * @return
-   */
-  public static String getUpdateStatusFileName(String updatedTimeStamp) {
-    return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN
-            + updatedTimeStamp;
-  }
-
-  /**
-   * This will handle the clean up cases if the update fails.
-   *
-   * @param table
-   * @param timeStamp
-   */
-  public static void cleanStaleDeltaFiles(CarbonTable table, final String timeStamp) {
-
-    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
-    // as of now considering only partition 0.
-    String partitionId = "0";
-    String partitionDir = carbonTablePath.getPartitionDir(partitionId);
-    CarbonFile file =
-            FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
-    if(!file.exists()) {
-      return;
-    }
-    for (CarbonFile eachDir : file.listFiles()) {
-      // for each dir check if the file with the delta timestamp is present or not.
-      CarbonFile[] toBeDeleted = eachDir.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          String fileName = file.getName();
-          return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
-        }
-      });
-      // deleting the files of a segment.
-      try {
-        CarbonUtil.deleteFoldersAndFilesSilent(toBeDeleted);
-      } catch (IOException e) {
-        LOGGER.error("Exception in deleting the delta files." + e);
-      } catch (InterruptedException e) {
-        LOGGER.error("Exception in deleting the delta files." + e);
-      }
-    }
-  }
-
-  /**
-   * returns timestamp as long value
-   *
-   * @param timtstamp
-   * @return
-   */
-  public static Long getTimeStampAsLong(String timtstamp) {
-    try {
-      long longValue = Long.parseLong(timtstamp);
-      return longValue;
-    } catch (NumberFormatException nfe) {
-      String errorMsg = "Invalid timestamp : " + timtstamp;
-      LOGGER.error(errorMsg);
-      return null;
-    }
-  }
-
-  /**
-   * returns integer value from given string
-   *
-   * @param value
-   * @return
-   * @throws Exception
-   */
-  public static Integer getIntegerValue(String value) throws Exception {
-    try {
-      int intValue = Integer.parseInt(value);
-      return intValue;
-    } catch (NumberFormatException nfe) {
-      LOGGER.error("Invalid row : " + value + nfe.getLocalizedMessage());
-      throw new Exception("Invalid row : " + nfe.getLocalizedMessage());
-    }
-  }
-
-  /**
-   * return only block name from completeBlockName
-   *
-   * @param completeBlockName
-   * @return
-   */
-  public static String getBlockName(String completeBlockName) {
-    String blockName =
-        completeBlockName.substring(0, completeBlockName.lastIndexOf(CarbonCommonConstants.HYPHEN));
-    return blockName;
-  }
-
-  /**
-   * returns segment id from segment name
-   *
-   * @param segmentName
-   * @return
-   */
-  public static String getSegmentId(String segmentName) {
-    String id = segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
-    return id;
-  }
-
-  public static int getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
-    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
-
-    // scan all the carbondata files and get the latest task ID.
-    CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
-    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
-      }
-    });
-    int max = 0;
-    if (null != dataFiles) {
-      for (CarbonFile file : dataFiles) {
-        int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
-        if (taskNumber > max) {
-          max = taskNumber;
-        }
-      }
-    }
-    // return max task No
-    return max;
-
-  }
-
-  public static String getLatestBlockNameForSegment(String segmentId, CarbonTablePath tablePath) {
-    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
-
-    // scan all the carbondata files and get the latest task ID.
-    CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
-
-    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        int max = 0;
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          int taskNumber = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()));
-          if (taskNumber >= max) {
-            return true;
-          }
-        }
-        return false;
-      }
-    });
-
-    // get the latest among the data files. highest task number will be at the last.
-    return dataFiles[dataFiles.length - 1].getName();
-  }
-
-  /**
-   * This method will convert a given timestamp to long value and then to string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  public static String convertTimeStampToString(String factTimeStamp) {
-    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return Long.toString(dateToStr.getTime());
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * This method will convert a given timestamp to long value and then to string back
-   *
-   * @param factTimeStamp
-   * @return
-   */
-  public static long convertTimeStampToLong(String factTimeStamp) {
-    SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
-    Date dateToStr = null;
-    try {
-      dateToStr = parser.parse(factTimeStamp);
-      return dateToStr.getTime();
-    } catch (ParseException e) {
-      LOGGER.error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e.getMessage());
-      parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      try {
-        dateToStr = parser.parse(factTimeStamp);
-        return dateToStr.getTime();
-      } catch (ParseException e1) {
-        LOGGER
-            .error("Cannot convert" + factTimeStamp + " to Time/Long type value" + e1.getMessage());
-        return 0;
-      }
-    }
-  }
-
-
-  /**
-   * Handling of the clean up of old carbondata files, index files , delte delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) {
-
-    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
-                    table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
-
-    LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
-
-    String validUpdateStatusFile = "";
-
-    // scan through each segment.
-
-    for (LoadMetadataDetails segment : details) {
-
-      // take the update status file name from 0th segment.
-      validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-
-      // if this segment is valid then only we will go for delta file deletion.
-      // if the segment is mark for delete or compacted then any way it will get deleted.
-
-      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              || segment.getLoadStatus()
-              .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
-
-        // take the list of files from this segment.
-        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
-        CarbonFile segDir =
-                FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-        CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-        // scan through the segment and find the carbondatafiles and index files.
-        SegmentUpdateStatusManager updateStatusManager =
-                new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
-
-        // get Invalid update  delta files.
-        CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
-
-        // now for each invalid delta file need to check the query execution time out
-        // and then delete.
-
-        for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-
-          compareTimestampsAndDelete(invalidFile, forceDelete, false);
-        }
-
-        // do the same for the index files.
-        CarbonFile[] invalidIndexFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
-
-        // now for each invalid index file need to check the query execution time out
-        // and then delete.
-
-        for (CarbonFile invalidFile : invalidIndexFiles) {
-
-          compareTimestampsAndDelete(invalidFile, forceDelete, false);
-        }
-
-        // now handle all the delete delta files which needs to be deleted.
-        // there are 2 cases here .
-        // 1. if the block is marked as compacted then the corresponding delta files
-        //    can be deleted if query exec timeout is done.
-        // 2. if the block is in success state then also there can be delete
-        //    delta compaction happened and old files can be deleted.
-
-        SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-        for (SegmentUpdateDetails block : updateDetails) {
-          CarbonFile[] completeListOfDeleteDeltaFiles;
-          CarbonFile[] invalidDeleteDeltaFiles;
-
-          if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-            continue;
-          }
-
-          // case 1
-          if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
-            completeListOfDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
-                            allSegmentFiles);
-            for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
-            }
-
-            CarbonFile[] blockRelatedFiles = updateStatusManager
-                    .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
-                            block.getActualBlockName());
-
-            // now for each invalid index file need to check the query execution time out
-            // and then delete.
-
-            for (CarbonFile invalidFile : blockRelatedFiles) {
-
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
-            }
-
-
-          } else {
-            invalidDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
-                            allSegmentFiles);
-            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
-            }
-          }
-        }
-      }
-    }
-
-    // delete the update table status files which are old.
-    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
-
-      final String updateStatusTimestamp = validUpdateStatusFile
-              .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
-
-      CarbonFile metaFolder = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(),
-              FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
-
-      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
-
-            // CHECK if this is valid or not.
-            // we only send invalid ones to delete.
-            if (!file.getName().endsWith(updateStatusTimestamp)) {
-              return true;
-            }
-          }
-          return false;
-        }
-      });
-
-      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
-
-        compareTimestampsAndDelete(invalidFile, forceDelete, true);
-      }
-    }
-  }
-
-  /**
-   * This will tell whether the max query timeout has been expired or not.
-   * @param fileTimestamp
-   * @return
-   */
-  public static boolean isMaxQueryTimeoutExceeded(long fileTimestamp) {
-    // record current time.
-    long currentTime = CarbonUpdateUtil.readCurrentTime();
-    int maxTime;
-    try {
-      maxTime = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
-    } catch (NumberFormatException e) {
-      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
-    }
-
-    long difference = currentTime - fileTimestamp;
-
-    long minutesElapsed = (difference / (1000 * 60));
-
-    return minutesElapsed > maxTime;
-
-  }
-
-  /**
-   *
-   * @param invalidFile
-   * @param forceDelete
-   * @param isUpdateStatusFile if true then the parsing of file name logic changes.
-   */
-  private static void compareTimestampsAndDelete(CarbonFile invalidFile,
-                                                 boolean forceDelete, boolean isUpdateStatusFile) {
-    long fileTimestamp = 0L;
-
-    if (isUpdateStatusFile) {
-      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
-              .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
-    } else {
-      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
-              CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
-    }
-
-    // if the timestamp of the file is more than the current time by query execution timeout.
-    // then delete that file.
-    if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
-      // delete the files.
-      try {
-        LOGGER.info("deleting the invalid file : " + invalidFile.getName());
-        CarbonUtil.deleteFoldersAndFiles(invalidFile);
-      } catch (IOException e) {
-        LOGGER.error("error in clean up of compacted files." + e.getMessage());
-      } catch (InterruptedException e) {
-        LOGGER.error("error in clean up of compacted files." + e.getMessage());
-      }
-    }
-  }
-
-  /**
-   *
-   * @param blockStatus
-   * @return
-   */
-  public static boolean isBlockInvalid(String blockStatus) {
-    if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
-            .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * This will return the current time in millis.
-   * @return
-   */
-  public static long readCurrentTime() {
-    return System.currentTimeMillis();
-  }
-
-  /**
-   *
-   * @param details
-   * @param segmentBlockCount
-   */
-  public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
-                                                Map<String, Long> segmentBlockCount) {
-
-    String segId = details.getSegmentName();
-
-    segmentBlockCount.put(details.getSegmentName(), segmentBlockCount.get(segId) - 1);
-
-  }
-
-  /**
-   *
-   * @param segmentBlockCount
-   * @return
-   */
-  public static List<String> getListOfSegmentsToMarkDeleted( Map<String, Long> segmentBlockCount) {
-    List<String> segmentsToBeDeleted =
-            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
-
-      if (eachSeg.getValue() == 0) {
-        segmentsToBeDeleted.add(eachSeg.getKey());
-      }
-
-    }
-    return segmentsToBeDeleted;
-  }
-
-  /**
-   *
-   * @param blockMappingVO
-   * @param segmentUpdateStatusManager
-   */
-  public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
-                                           SegmentUpdateStatusManager segmentUpdateStatusManager) {
-
-    Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
-
-    Map<String, RowCountDetailsVO> outputMap =
-            new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
-      String key = blockRowEntry.getKey();
-      long alreadyDeletedCount = 0;
-
-      SegmentUpdateDetails detail = segmentUpdateStatusManager.getDetailsForABlock(key);
-
-      if (null != detail) {
-
-        alreadyDeletedCount = Long.parseLong(detail.getDeletedRowsInBlock());
-
-      }
-
-      RowCountDetailsVO rowCountDetailsVO =
-              new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
-      outputMap.put(key, rowCountDetailsVO);
-
-    }
-
-    blockMappingVO.setCompleteBlockRowDetailVO(outputMap);
-
-  }
-
-  /**
-   *
-   * @param segID
-   * @param blockName
-   * @return
-   */
-  public static String getSegmentBlockNameKey(String segID, String blockName) {
-
-    String blockNameWithOutPart = blockName
-            .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
-                    blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
-
-    return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
deleted file mode 100644
index 644c5c1..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockDetails.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.update;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-/**
- * This class stores the block details of delete delta file
- */
-public class DeleteDeltaBlockDetails implements Serializable {
-
-  private static final long serialVersionUID = 1206104914918495724L;
-
-  private List<DeleteDeltaBlockletDetails> blockletDetails;
-  private String blockName;
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteDeltaBlockDetails.class.getName());
-
-  public DeleteDeltaBlockDetails(String blockName) {
-    this.blockName = blockName;
-    blockletDetails = new ArrayList<DeleteDeltaBlockletDetails>();
-  }
-
-  public String getBlockName() {
-    return blockName;
-  }
-
-  public void setBlockName(String blockName) {
-    this.blockName = blockName;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) return true;
-    if (obj == null || !(obj instanceof DeleteDeltaBlockDetails)) return false;
-
-    DeleteDeltaBlockDetails that = (DeleteDeltaBlockDetails) obj;
-
-    return blockName.equals(that.blockName);
-
-  }
-
-  @Override public int hashCode() {
-    return blockName.hashCode();
-  }
-
-  public List<DeleteDeltaBlockletDetails> getBlockletDetails() {
-    return blockletDetails;
-  }
-
-  public boolean addBlockletDetails(DeleteDeltaBlockletDetails blocklet) {
-    int index = blockletDetails.indexOf(blocklet);
-    if (blockletDetails.isEmpty() || index == -1) {
-      return blockletDetails.add(blocklet);
-    } else {
-      return blockletDetails.get(index).addDeletedRows(blocklet.getDeletedRows());
-    }
-  }
-
-  public boolean addBlocklet(String blockletId, String offset) throws Exception {
-    DeleteDeltaBlockletDetails blocklet = new DeleteDeltaBlockletDetails(blockletId);
-    try {
-      blocklet.addDeletedRow(CarbonUpdateUtil.getIntegerValue(offset));
-      return addBlockletDetails(blocklet);
-    } catch (Exception e) {
-      LOGGER.debug(e.getMessage());
-      throw e;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java b/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
deleted file mode 100644
index d3976f4..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/DeleteDeltaBlockletDetails.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.update;
-
-import java.io.Serializable;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-/**
- * This class stores the blocklet details of delete delta file
- */
-public class DeleteDeltaBlockletDetails implements Serializable {
-
-  private static final long serialVersionUID = 1206104914911491724L;
-  private String id;
-  private Set<Integer> deletedRows;
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteDeltaBlockletDetails.class.getName());
-
-  public DeleteDeltaBlockletDetails(String id) {
-    this.id = id;
-    deletedRows = new TreeSet<Integer>();
-  }
-
-  public boolean addDeletedRows(Set<Integer> rows) {
-    return deletedRows.addAll(rows);
-  }
-
-  public boolean addDeletedRow(Integer row) {
-    return deletedRows.add(row);
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public void setId(String id) {
-    this.id = id;
-  }
-
-  public Set<Integer> getDeletedRows() {
-    return deletedRows;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || !(obj instanceof DeleteDeltaBlockletDetails)) {
-      return false;
-    }
-
-    DeleteDeltaBlockletDetails that = (DeleteDeltaBlockletDetails) obj;
-    return id.equals(that.id);
-  }
-
-  @Override public int hashCode() {
-    return id.hashCode();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java b/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
deleted file mode 100644
index e485ff2..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/SegmentUpdateDetails.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.update;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-/**
- * This class stores the segment details of table update status file
- */
-public class SegmentUpdateDetails implements Serializable {
-
-  private static final long serialVersionUID = 1206104914918491724L;
-  private String segmentName;
-  private String blockName;
-  private String status = "";
-  private String deleteDeltaEndTimestamp = "";
-  private String deleteDeltaStartTimestamp = "";
-  private String actualBlockName;
-  private String deletedRowsInBlock = "0";
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SegmentUpdateDetails.class.getName());
-
-  public String getDeleteDeltaEndTimestamp() {
-    return deleteDeltaEndTimestamp;
-  }
-
-  public void setDeleteDeltaEndTimestamp(String deleteDeltaEndTimestamp) {
-    this.deleteDeltaEndTimestamp = deleteDeltaEndTimestamp;
-  }
-
-  public String getSegmentName() {
-    return segmentName;
-  }
-
-  public void setSegmentName(String segmentName) {
-    this.segmentName = segmentName;
-  }
-
-  public String getBlockName() {
-    return blockName;
-  }
-
-  public void setBlockName(String blockName) {
-    this.blockName = blockName;
-  }
-
-  public String getDeleteDeltaStartTimestamp() {
-    return deleteDeltaStartTimestamp;
-  }
-
-  public void setDeleteDeltaStartTimestamp(String deleteDeltaStartTimestamp) {
-    this.deleteDeltaStartTimestamp = deleteDeltaStartTimestamp;
-  }
-
-  public void setStatus(String status) { this.status = status;}
-
-  public String getStatus() {return this.status;}
-
-  @Override public int hashCode() {
-    final int prime = 31;
-    int result = segmentName.hashCode();
-    result = prime * result + blockName.hashCode();
-    return result;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-
-    }
-    if (!(obj instanceof SegmentUpdateDetails)) {
-      return false;
-    }
-    SegmentUpdateDetails other = (SegmentUpdateDetails) obj;
-    if (segmentName == null) {
-      if (other.segmentName != null) {
-        return false;
-      }
-    } else if (!segmentName.equals(other.segmentName)) {
-      return false;
-    }
-    if (blockName == null) {
-      if (other.blockName != null) {
-        return false;
-      }
-    } else if (!blockName.equals(other.blockName)) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * return deleteDeltaTime as long
-   *
-   * @return
-   */
-  public long getDeleteDeltaEndTimeAsLong() {
-    return getTimeStampAsLong(deleteDeltaEndTimestamp);
-  }
-
-  /**
-   * return deleteDeltaTime as long
-   *
-   * @return
-   */
-  public long getDeleteDeltaStartTimeAsLong() {
-
-    return getTimeStampAsLong(deleteDeltaStartTimestamp);
-  }
-
-  /**
-   * returns complete block name
-   *
-   * @return
-   */
-  public String getActualBlockName() {
-    return actualBlockName;
-  }
-
-  public void setActualBlockName(String actualBlockName) {
-    this.actualBlockName = actualBlockName;
-  }
-
-  /**
-   * returns timestamp as long value
-   *
-   * @param timtstamp
-   * @return
-   */
-  private Long getTimeStampAsLong(String timtstamp) {
-    long longValue = 0;
-    try {
-      longValue = Long.parseLong(timtstamp);
-    } catch (NumberFormatException nfe) {
-      String errorMsg = "Invalid timestamp : " + timtstamp;
-      LOGGER.debug(errorMsg);
-    }
-    return longValue;
-  }
-
-  public String getDeletedRowsInBlock() {
-    return deletedRowsInBlock;
-  }
-
-  public void setDeletedRowsInBlock(String deletedRowsInBlock) {
-    this.deletedRowsInBlock = deletedRowsInBlock;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java b/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
deleted file mode 100644
index 2a38ed8..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/TupleIdEnum.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.update;
-
-/**
- * Enum class for tupleID.
- */
-public enum TupleIdEnum {
-  PART_ID(0),
-  SEGMENT_ID(1),
-  BLOCK_ID(2),
-  BLOCKLET_ID(3),
-  OFFSET(4);
-
-  private int index;
-
-  TupleIdEnum(int index) {
-    this.index = index;
-  }
-
-  public int getTupleIdIndex(){
-    return this.index;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
deleted file mode 100644
index ad744c4..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.update;
-
-import java.io.Serializable;
-
-/**
- * VO class for storing details related to Update operation.
- */
-public class UpdateVO implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-  private Long factTimestamp;
-
-  private Long updateDeltaStartTimestamp;
-
-  private String segmentId;
-
-  public Long getLatestUpdateTimestamp() {
-    return latestUpdateTimestamp;
-  }
-
-  public void setLatestUpdateTimestamp(Long latestUpdateTimestamp) {
-    this.latestUpdateTimestamp = latestUpdateTimestamp;
-  }
-
-  private Long latestUpdateTimestamp;
-
-  public Long getFactTimestamp() {
-    return factTimestamp;
-  }
-
-  public void setFactTimestamp(Long factTimestamp) {
-    this.factTimestamp = factTimestamp;
-  }
-
-  public Long getUpdateDeltaStartTimestamp() {
-    return updateDeltaStartTimestamp;
-  }
-
-  public void setUpdateDeltaStartTimestamp(Long updateDeltaStartTimestamp) {
-    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    UpdateVO updateVO = (UpdateVO) o;
-    if (factTimestamp != null ?
-        !factTimestamp.equals(updateVO.factTimestamp) :
-        updateVO.factTimestamp != null) {
-      return false;
-    }
-    if (updateDeltaStartTimestamp != null ?
-        !updateDeltaStartTimestamp.equals(updateVO.updateDeltaStartTimestamp) :
-        updateVO.updateDeltaStartTimestamp != null) {
-      return false;
-    }
-    return latestUpdateTimestamp != null ?
-        latestUpdateTimestamp.equals(updateVO.latestUpdateTimestamp) :
-        updateVO.latestUpdateTimestamp == null;
-
-  }
-
-  @Override public int hashCode() {
-    int result = factTimestamp != null ? factTimestamp.hashCode() : 0;
-    result = 31 * result + (updateDeltaStartTimestamp != null ?
-        updateDeltaStartTimestamp.hashCode() :
-        0);
-    result = 31 * result + (latestUpdateTimestamp != null ? latestUpdateTimestamp.hashCode() : 0);
-    return result;
-  }
-
-  /**
-   * This will return the update timestamp if its present or it will return the fact timestamp.
-   * @return
-   */
-  public Long getCreatedOrUpdatedTimeStamp() {
-    if (null == latestUpdateTimestamp) {
-      return factTimestamp;
-    }
-    return latestUpdateTimestamp;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
deleted file mode 100644
index cfba54d..0000000
--- a/core/src/main/java/org/apache/carbondata/core/updatestatus/SegmentStatusManager.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.updatestatus;
-
-import java.io.*;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.update.CarbonUpdateUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.fileoperations.FileWriteOperation;
-import org.apache.carbondata.locks.CarbonLockFactory;
-import org.apache.carbondata.locks.CarbonLockUtil;
-import org.apache.carbondata.locks.ICarbonLock;
-import org.apache.carbondata.locks.LockUsage;
-
-import com.google.gson.Gson;
-
-/**
- * Manages Load/Segment status
- */
-public class SegmentStatusManager {
-
-  private static final LogService LOG =
-      LogServiceFactory.getLogService(SegmentStatusManager.class.getName());
-
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-  public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) {
-    this.absoluteTableIdentifier = absoluteTableIdentifier;
-  }
-
-  /**
-   * This will return the lock object used to lock the table status file before updation.
-   *
-   * @return
-   */
-  public ICarbonLock getTableStatusLock() {
-    return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
-            LockUsage.TABLE_STATUS_LOCK);
-  }
-
-  /**
-   * This method will return last modified time of tablestatus file
-   */
-  public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier)
-      throws IOException {
-    String tableStatusPath = CarbonStorePath
-        .getCarbonTablePath(identifier.getStorePath(), identifier.getCarbonTableIdentifier())
-        .getTableStatusFilePath();
-    if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
-      return 0L;
-    } else {
-      return FileFactory.getCarbonFile(tableStatusPath, FileFactory.getFileType(tableStatusPath))
-          .getLastModifiedTime();
-    }
-  }
-
-  public List<Long> getUpdateDeltaStartEndTimeStamp(final String loadIds,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray) {
-    List<Long> updateDeltaStartEndTimestamp = new ArrayList<>();
-    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-      if (loadIds.equalsIgnoreCase(loadMetadata.getLoadName())) {
-        // Make sure the Load is not compacted and not marked for delete.
-        if (CarbonCommonConstants.COMPACTED
-                .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
-          return null;
-        } else if (CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
-          return null;
-        }
-        else {
-          return updateDeltaStartEndTimestamp;
-        }
-      }
-    }
-    return null;
-  }
-
-
-  /**
-   * get valid segment for given table
-   *
-   * @return
-   * @throws IOException
-   */
-  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
-
-    // @TODO: move reading LoadStatus file to separate class
-    List<String> listOfValidSegments = new ArrayList<String>(10);
-    List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
-    List<String> listOfInvalidSegments = new ArrayList<String>(10);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
-    String dataPath = carbonTablePath.getTableStatusFilePath();
-    DataInputStream dataInputStream = null;
-    Gson gsonObjectToRead = new Gson();
-    AtomicFileOperations fileOperation =
-            new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
-    LoadMetadataDetails[] loadFolderDetailsArray;
-    try {
-      if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
-        dataInputStream = fileOperation.openForRead();
-        BufferedReader buffReader =
-                new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
-        loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
-        //just directly iterate Array
-        List<LoadMetadataDetails> loadFolderDetails = Arrays.asList(loadFolderDetailsArray);
-        for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) {
-          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.MARKED_FOR_UPDATE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-            // check for merged loads.
-            if (null != loadMetadataDetails.getMergedLoadName()) {
-              if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
-                listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
-              }
-              // if merged load is updated then put it in updated list
-              if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                      .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-                listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
-              }
-              continue;
-            }
-
-            if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                    .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-
-              listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
-            }
-            listOfValidSegments.add(loadMetadataDetails.getLoadName());
-          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.COMPACTED
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.MARKED_FOR_DELETE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
-            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
-          }
-        }
-      }
-    } catch (IOException e) {
-      LOG.error(e);
-      throw e;
-    } finally {
-      try {
-        if (null != dataInputStream) {
-          dataInputStream.close();
-        }
-      } catch (Exception e) {
-        LOG.error(e);
-        throw e;
-      }
-    }
-    return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
-            listOfInvalidSegments);
-  }
-
-  /**
-   * This method reads the load metadata file
-   *
-   * @param tableFolderPath
-   * @return
-   */
-  public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
-    Gson gsonObjectToRead = new Gson();
-    DataInputStream dataInputStream = null;
-    BufferedReader buffReader = null;
-    InputStreamReader inStream = null;
-    String metadataFileName = tableFolderPath + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.LOADMETADATA_FILENAME;
-    LoadMetadataDetails[] listOfLoadFolderDetailsArray;
-    AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName));
-
-    try {
-      if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) {
-        return new LoadMetadataDetails[0];
-      }
-      dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      buffReader = new BufferedReader(inStream);
-      listOfLoadFolderDetailsArray =
-          gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
-    } catch (IOException e) {
-      return new LoadMetadataDetails[0];
-    } finally {
-      closeStreams(buffReader, inStream, dataInputStream);
-    }
-
-    return listOfLoadFolderDetailsArray;
-  }
-
-  /**
-   * compares two given date strings
-   *
-   * @param loadValue
-   * @param userValue
-   * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
-   * 0 otherwise
-   */
-  private static Integer compareDateValues(Long loadValue, Long userValue) {
-    return loadValue.compareTo(userValue);
-  }
-
-  /**
-   * updates deletion status
-   *
-   * @param loadIds
-   * @param tableFolderPath
-   * @return
-   */
-  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
-      List<String> loadIds, String tableFolderPath) throws Exception {
-    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
-    ICarbonLock carbonDeleteSegmentLock =
-        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
-    ICarbonLock carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK);
-    String tableDetails =
-        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
-    List<String> invalidLoadIds = new ArrayList<String>(0);
-    try {
-      if (carbonDeleteSegmentLock.lockWithRetries()) {
-        LOG.info("Delete segment lock has been successfully acquired");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
-        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-        if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
-          // log error.
-          LOG.error("Load metadata file is not present.");
-          return loadIds;
-        }
-        // read existing metadata details in load metadata.
-        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
-        if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
-          if (invalidLoadIds.isEmpty()) {
-            // All or None , if anything fails then dont write
-            if(carbonTableStatusLock.lockWithRetries()) {
-              LOG.info("Table status lock has been successfully acquired");
-              // To handle concurrency scenarios, always take latest metadata before writing
-              // into status file.
-              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
-              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
-                  latestLoadMetadataDetails);
-              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
-            }
-            else {
-              String errorMsg = "Delete segment by id is failed for " + tableDetails
-                  + ". Not able to acquire the table status lock due to other operation running "
-                  + "in the background.";
-              LOG.audit(errorMsg);
-              LOG.error(errorMsg);
-              throw new Exception(errorMsg + " Please try after some time.");
-            }
-
-          } else {
-            return invalidLoadIds;
-          }
-
-        } else {
-          LOG.audit("Delete segment by Id is failed. No matching segment id found.");
-          return loadIds;
-        }
-
-      } else {
-        String errorMsg = "Delete segment by id is failed for " + tableDetails
-            + ". Not able to acquire the delete segment lock due to another delete "
-            + "operation is running in the background.";
-        LOG.audit(errorMsg);
-        LOG.error(errorMsg);
-        throw new Exception(errorMsg + " Please try after some time.");
-      }
-    } catch (IOException e) {
-      LOG.error("IOException" + e.getMessage());
-    } finally {
-      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
-      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
-    }
-
-    return invalidLoadIds;
-  }
-
-  /**
-   * updates deletion status
-   *
-   * @param loadDate
-   * @param tableFolderPath
-   * @return
-   */
-  public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
-      String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
-    CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
-    ICarbonLock carbonDeleteSegmentLock =
-        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
-    ICarbonLock carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK);
-    String tableDetails =
-        carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName();
-    List<String> invalidLoadTimestamps = new ArrayList<String>(0);
-    try {
-      if (carbonDeleteSegmentLock.lockWithRetries()) {
-        LOG.info("Delete segment lock has been successfully acquired");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-            identifier.getStorePath(), identifier.getCarbonTableIdentifier());
-        String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-
-        if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
-          // log error.
-          LOG.error("Error message: " + "Load metadata file is not present.");
-          invalidLoadTimestamps.add(loadDate);
-          return invalidLoadTimestamps;
-        }
-        // read existing metadata details in load metadata.
-        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
-        if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps,
-              loadStartTime);
-          if (invalidLoadTimestamps.isEmpty()) {
-            if(carbonTableStatusLock.lockWithRetries()) {
-              LOG.info("Table status lock has been successfully acquired.");
-              // To handle concurrency scenarios, always take latest metadata before writing
-              // into status file.
-              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
-              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
-                  latestLoadMetadataDetails);
-              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
-            }
-            else {
-
-              String errorMsg = "Delete segment by date is failed for " + tableDetails
-                  + ". Not able to acquire the table status lock due to other operation running "
-                  + "in the background.";
-              LOG.audit(errorMsg);
-              LOG.error(errorMsg);
-              throw new Exception(errorMsg + " Please try after some time.");
-
-            }
-          } else {
-            return invalidLoadTimestamps;
-          }
-
-        } else {
-          LOG.audit("Delete segment by date is failed. No matching segment found.");
-          invalidLoadTimestamps.add(loadDate);
-          return invalidLoadTimestamps;
-        }
-
-      } else {
-        String errorMsg = "Delete segment by date is failed for " + tableDetails
-            + ". Not able to acquire the delete segment lock due to another delete "
-            + "operation is running in the background.";
-        LOG.audit(errorMsg);
-        LOG.error(errorMsg);
-        throw new Exception(errorMsg + " Please try after some time.");
-      }
-    } catch (IOException e) {
-      LOG.error("Error message: " + "IOException" + e.getMessage());
-    } finally {
-      CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
-      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
-    }
-
-    return invalidLoadTimestamps;
-  }
-
-  /**
-   * writes load details into a given file at @param dataLoadLocation
-   *
-   * @param dataLoadLocation
-   * @param listOfLoadFolderDetailsArray
-   * @throws IOException
-   */
-  public static void writeLoadDetailsIntoFile(String dataLoadLocation,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
-    AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-    BufferedWriter brWriter = null;
-    DataOutputStream dataOutputStream = null;
-    Gson gsonObjectToWrite = new Gson();
-    // write the updated data into the metadata file.
-
-    try {
-      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
-      brWriter.write(metadataInstance);
-    } catch (IOException ioe) {
-      LOG.error("Error message: " + ioe.getLocalizedMessage());
-    } finally {
-      if (null != brWriter) {
-        brWriter.flush();
-      }
-      CarbonUtil.closeStreams(brWriter);
-      fileWrite.close();
-    }
-
-  }
-
-  /**
-   * updates deletion status details for each load and returns invalidLoadIds
-   *
-   * @param loadIds
-   * @param listOfLoadFolderDetailsArray
-   * @param invalidLoadIds
-   * @return invalidLoadIds
-   */
-  private static List<String> updateDeletionStatus(List<String> loadIds,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) {
-    for (String loadId : loadIds) {
-      boolean loadFound = false;
-      // For each load id loop through data and if the
-      // load id is found then mark
-      // the metadata as deleted.
-      for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-
-        if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
-          // if the segment is compacted then no need to delete that.
-          if (CarbonCommonConstants.COMPACTED
-                  .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
-            LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
-            invalidLoadIds.add(loadId);
-            return invalidLoadIds;
-          }
-          if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
-            loadFound = true;
-            loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
-            loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
-            LOG.info("Segment ID " + loadId + " Marked for Delete");
-          }
-          break;
-        }
-      }
-
-      if (!loadFound) {
-        LOG.audit("Delete segment by ID is failed. No matching segment id found :" + loadId);
-        invalidLoadIds.add(loadId);
-        return invalidLoadIds;
-      }
-    }
-    return invalidLoadIds;
-  }
-
-  /**
-   * updates deletion status details for load and returns invalidLoadTimestamps
-   *
-   * @param loadDate
-   * @param listOfLoadFolderDetailsArray
-   * @param invalidLoadTimestamps
-   * @return invalidLoadTimestamps
-   */
-  public static List<String> updateDeletionStatus(String loadDate,
-      LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
-      Long loadStartTime) {
-    // For each load timestamp loop through data and if the
-    // required load timestamp is found then mark
-    // the metadata as deleted.
-    boolean loadFound = false;
-    String loadStartTimeString = "Load Start Time: ";
-    for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-      Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
-      if (result < 0) {
-        if (CarbonCommonConstants.COMPACTED
-            .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
-          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
-              + "as the segment has been compacted.");
-          continue;
-        }
-        if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
-          loadFound = true;
-          updateSegmentMetadataDetails(loadMetadata);
-          LOG.info("Info: " +
-              loadStartTimeString + loadMetadata.getLoadStartTime() +
-              " Marked for Delete");
-        }
-      }
-
-    }
-
-    if (!loadFound) {
-      invalidLoadTimestamps.add(loadDate);
-      LOG.audit("Delete segment by date is failed. No matching segment found.");
-      return invalidLoadTimestamps;
-    }
-    return invalidLoadTimestamps;
-  }
-
-  /**
-   * This method closes the streams
-   *
-   * @param streams - streams to close.
-   */
-  private static void closeStreams(Closeable... streams) {
-    // Added if to avoid NullPointerException in case one stream is being passed as null
-    if (null != streams) {
-      for (Closeable stream : streams) {
-        if (null != stream) {
-          try {
-            stream.close();
-          } catch (IOException e) {
-            LOG.error("Error while closing stream" + stream);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * updates table status details using latest metadata
-   *
-   * @param oldMetadata
-   * @param newMetadata
-   * @return
-   */
-
-  public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
-      LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
-
-    List<LoadMetadataDetails> newListMetadata =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
-    for (LoadMetadataDetails oldSegment : oldMetadata) {
-      if (CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus())) {
-        updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
-      }
-    }
-    return newListMetadata;
-  }
-
-  /**
-   * updates segment status and modificaton time details
-   *
-   * @param loadMetadata
-   */
-  public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
-    // update status only if the segment is not marked for delete
-    if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
-      loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
-      loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
-    }
-  }
-
-  /**
-   * This API will return the update status file name.
-   * @param segmentList
-   * @return
-   */
-  public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
-    if(segmentList.length == 0) {
-      return "";
-    }
-    else {
-      for(LoadMetadataDetails eachSeg : segmentList) {
-        // file name stored in 0th segment.
-        if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
-          return eachSeg.getUpdateStatusFileName();
-        }
-      }
-    }
-    return "";
-  }
-
-  /**
-   * getting the task numbers present in the segment.
-   * @param segmentId
-   * @return
-   */
-  public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
-          updateStatusManager) {
-    List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
-    for (String eachFileName : list) {
-      taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
-    }
-    return taskList;
-  }
-
-
-  public static class ValidAndInvalidSegmentsInfo {
-    private final List<String> listOfValidSegments;
-    private final List<String> listOfValidUpdatedSegments;
-    private final List<String> listOfInvalidSegments;
-
-    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
-        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
-      this.listOfValidSegments = listOfValidSegments;
-      this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
-      this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
-    }
-    public List<String> getInvalidSegments() {
-      return listOfInvalidSegments;
-    }
-    public List<String> getValidSegments() {
-      return listOfValidSegments;
-    }
-  }
-}


Mime
View raw message