carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4062] Refactor clean files feature
Date Mon, 07 Dec 2020 15:51:19 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e967e  [CARBONDATA-4062] Refactor clean files feature
74e967e is described below

commit 74e967e7520b7fe7c71cca1d2910daa6c41d1248
Author: QiangCai <qiangcai@qq.com>
AuthorDate: Mon Dec 7 02:07:32 2020 +0800

    [CARBONDATA-4062] Refactor clean files feature
    
    Why is this PR needed?
    To prevent accidental deletion of data, carbon will introduce trash data management. It will provide buffer time for accidental deletion of data to roll back the delete operation.
    
    Trash data management is a part of carbon data lifecycle management. Clean files as a data trash manager should contain the following two parts.
    part 1: manage metadata-indexed data trash.
    This data is at the original place of the table and indexed by metadata. carbon manages this data by metadata index and should avoid using listFile() interface.
    part 2: manage ".Trash" folder.
    Now ".Trash" folder is without metadata index, and the operation on it bases on timestamp and listFile() interface. In the future, carbon will index ".Trash" folder to improve data trash management.
    
    What changes were proposed in this PR?
    remove data clean function from all features, but keep exception-handling part
    Notes: the following features still clean data
    a) drop table/index/mv
    b) drop database
    only clean files function works as a data trash manager now
    support concurrent operation with other feature(loading, compaction, update/delete, and so on)
    
    Does this PR introduce any user interface change?
    Yes. Document is updated.
    
    Is any new testcase added?
    No
    
    This closes #4044
---
 .../core/constants/CarbonCommonConstants.java      |   2 +-
 .../carbondata/core/metadata/SegmentFileStore.java |   5 +-
 .../core/metadata/schema/table/CarbonTable.java    |   4 +
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 213 ---------------------
 .../core/statusmanager/LoadMetadataDetails.java    |   9 +-
 .../carbondata/core/util/CarbonProperties.java     |  25 +--
 .../carbondata/core/util/CleanFilesUtil.java       |  60 +++---
 .../carbondata/core/util/DeleteLoadFolders.java    |  37 ++--
 .../org/apache/carbondata/core/util/TrashUtil.java |  26 +--
 .../carbondata/core/util/path/CarbonTablePath.java |   4 +-
 docs/clean-files.md                                |   3 +
 .../org/apache/carbondata/api/CarbonStore.scala    | 141 +-------------
 .../carbondata/events/CleanFilesEvents.scala       |  17 +-
 .../{CleanFilesEvents.scala => package.scala}      |  30 +--
 .../scala/org/apache/carbondata/spark/KeyVal.scala |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           | 110 +++++------
 .../spark/rdd/CarbonTableCompactor.scala           |  63 ++++--
 .../carbondata/spark/rdd/CompactionFactory.scala   |   4 -
 .../apache/carbondata/spark/rdd/Compactor.scala    |  27 +--
 .../carbondata/spark/rdd/StreamHandoffRDD.scala    |   2 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  48 -----
 .../apache/carbondata/trash/DataTrashManager.scala | 167 ++++++++++++++++
 .../org/apache/carbondata/view/MVRefresher.scala   |   2 -
 .../command/carbonTableSchemaCommon.scala          |   3 +-
 .../execution/command/index/DropIndexCommand.scala |  10 +-
 .../command/management/CarbonAddLoadCommand.scala  |   2 +-
 .../CarbonAlterTableCompactionCommand.scala        |   2 -
 .../management/CarbonCleanFilesCommand.scala       | 168 ++--------------
 .../management/CarbonInsertIntoCommand.scala       |   4 -
 .../management/CarbonInsertIntoWithDf.scala        |   6 -
 .../command/management/CarbonLoadDataCommand.scala |  15 +-
 .../command/mutation/CarbonTruncateCommand.scala   |  37 ++--
 .../command/mutation/DeleteExecution.scala         |   2 +-
 .../command/table/CarbonDropTableCommand.scala     |   9 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   5 +-
 .../events/CleanFilesPostEventListener.scala       |  67 +++++--
 .../events/SIDropEventListener.scala               |   1 -
 .../hive/CarbonInternalMetastore.scala             |  15 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |  11 --
 .../scala/org/apache/spark/util/CleanFiles.scala   |  45 ++---
 .../cleanfiles/TestCleanFileCommand.scala          |   2 +-
 .../org/apache/spark/util/CarbonCommandSuite.scala |  10 +-
 .../loading/TableProcessingOperations.java         | 114 -----------
 .../processing/util/CarbonLoaderUtil.java          |  89 ++-------
 44 files changed, 511 insertions(+), 1109 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 57fbfe0..c7bdfa8 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1432,7 +1432,7 @@ public final class CarbonCommonConstants {
   /**
    * Default retention time of a subdirectory in trash folder is 7 days.
    */
-  public static final int CARBON_TRASH_RETENTION_DAYS_DEFAULT = 7;
+  public static final String CARBON_TRASH_RETENTION_DAYS_DEFAULT = "7";
 
   /**
    * Maximum allowed retention time of a subdirectory in trash folder is 365 days.
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 803fc40..f8c8cb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -67,6 +67,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
+import org.apache.carbondata.core.util.TrashUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 
@@ -1044,7 +1045,7 @@ public class SegmentFileStore {
           Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
               .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
                   indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
-          if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+          if (TrashUtil.isDataOutsideTrashIsExpired(fileTimestamp) || forceDelete) {
             // Add the corresponding carbondata files to the delete list.
             toBeDeletedDataFiles.addAll(entry.getValue());
           }
@@ -1060,7 +1061,7 @@ public class SegmentFileStore {
                 .substring(indexFile.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1,
                     indexFile.length() - CarbonTablePath.MERGE_INDEX_FILE_EXT.length()));
           }
-          if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+          if (TrashUtil.isDataOutsideTrashIsExpired(fileTimestamp) || forceDelete) {
             toBeDeletedIndexFiles.add(indexFile);
           }
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4fd58cd..96084cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -465,6 +465,10 @@ public class CarbonTable implements Serializable, Writable {
     return tableInfo.getTableUniqueName();
   }
 
+  public String getQualifiedName() {
+    return getDatabaseName() + CarbonCommonConstants.POINT + getTableName();
+  }
+
   /**
    * Return true if local dictionary enabled for the table
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index eb022a1..c5296dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -483,176 +483,6 @@ public class CarbonUpdateUtil {
   }
 
   /**
-   * Handling of the clean up of old carbondata files, index files , delete delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException {
-
-    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-    LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
-
-    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
-    SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails();
-    // hold all the segments updated so that wen can check the delta files in them, ne need to
-    // check the others.
-    Set<String> updatedSegments = new HashSet<>();
-    for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
-      updatedSegments.add(updateDetails.getSegmentName());
-    }
-
-    String validUpdateStatusFile = "";
-
-    boolean isAbortedFile = true;
-
-    boolean isInvalidFile = false;
-
-    // take the update status file name from 0th segment.
-    validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-    // scan through each segment.
-    for (LoadMetadataDetails segment : details) {
-      // if this segment is valid then only we will go for delta file deletion.
-      // if the segment is mark for delete or compacted then any way it will get deleted.
-      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-              || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-        // when there is no update operations done on table, then no need to go ahead. So
-        // just check the update delta start timestamp and proceed if not empty
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
-                || updatedSegments.contains(segment.getLoadName())) {
-          // take the list of files from this segment.
-          String segmentPath = CarbonTablePath.getSegmentPath(
-              table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
-          CarbonFile segDir =
-              FileFactory.getCarbonFile(segmentPath);
-          CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-          // now handle all the delete delta files which needs to be deleted.
-          // there are 2 cases here .
-          // 1. if the block is marked as compacted then the corresponding delta files
-          //    can be deleted if query exec timeout is done.
-          // 2. if the block is in success state then also there can be delete
-          //    delta compaction happened and old files can be deleted.
-
-          SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-          for (SegmentUpdateDetails block : updateDetails) {
-            CarbonFile[] completeListOfDeleteDeltaFiles;
-            CarbonFile[] invalidDeleteDeltaFiles;
-
-            if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-              continue;
-            }
-
-            // aborted scenario.
-            invalidDeleteDeltaFiles = updateStatusManager
-                .getDeleteDeltaInvalidFilesList(block, false,
-                    allSegmentFiles, isAbortedFile);
-            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-              boolean doForceDelete = true;
-              compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-            }
-
-            // case 1
-            if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-              completeListOfDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, true,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-
-            } else {
-              invalidDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, false,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-            }
-          }
-        }
-        // handle cleanup of merge index files and data files after small files merge happened for
-        // SI table
-        cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment);
-      }
-    }
-
-    // delete the update table status files which are old.
-    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
-
-      final String updateStatusTimestamp = validUpdateStatusFile
-          .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
-
-      String tablePath = table.getAbsoluteTableIdentifier().getTablePath();
-      CarbonFile metaFolder = FileFactory.getCarbonFile(
-          CarbonTablePath.getMetadataPath(tablePath));
-
-      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
-            // CHECK if this is valid or not.
-            // we only send invalid ones to delete.
-            return !file.getName().endsWith(updateStatusTimestamp);
-          }
-          return false;
-        }
-      });
-
-      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
-        compareTimestampsAndDelete(invalidFile, forceDelete, true);
-      }
-    }
-  }
-
-  /**
-   * this is the clean up added specifically for SI table, because after we merge the data files
-   * inside the secondary index table, we need to delete the stale carbondata files.
-   * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
-   */
-  private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table,
-      LoadMetadataDetails segment) throws IOException {
-    if (table.isIndexTable()) {
-      String segmentPath = CarbonTablePath
-          .getSegmentPath(table.getAbsoluteTableIdentifier().getTablePath(),
-              segment.getLoadName());
-      CarbonFile segmentDirPath =
-          FileFactory.getCarbonFile(segmentPath);
-      CarbonFile[] allFilesOfSegment = segmentDirPath.listFiles();
-      long startTimeStampFinal = segment.getLoadStartTime();
-      long endTimeStampFinal = segment.getLoadEndTime();
-      boolean deleteFile;
-      for (CarbonFile file : allFilesOfSegment) {
-        deleteFile = false;
-        String fileTimestamp =
-            CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file.getName());
-        // check for old files before load start time and the aborted files after end time
-        if ((file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName()
-            .endsWith(CarbonTablePath.INDEX_FILE_EXT)) && (
-            Long.parseLong(fileTimestamp) < startTimeStampFinal
-                || Long.parseLong(fileTimestamp) > endTimeStampFinal)) {
-          deleteFile = true;
-        } else if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
-            && Long.parseLong(fileTimestamp) < startTimeStampFinal) {
-          deleteFile = true;
-        }
-        if (deleteFile) {
-          // delete the files and folders.
-          try {
-            LOGGER.info("Deleting the invalid file : " + file.getName());
-            CarbonUtil.deleteFoldersAndFiles(file);
-          } catch (IOException | InterruptedException e) {
-            LOGGER.error("Error in clean up of merged files." + e.getMessage(), e);
-          }
-        }
-      }
-    }
-  }
-
-
-  /**
    * This will tell whether the max query timeout has been expired or not.
    * @param fileTimestamp
    * @return
@@ -676,49 +506,6 @@ public class CarbonUpdateUtil {
 
   }
 
-  /**
-   *
-   * @param invalidFile
-   * @param forceDelete
-   * @param isUpdateStatusFile if true then the parsing of file name logic changes.
-   */
-  private static boolean compareTimestampsAndDelete(
-      CarbonFile invalidFile,
-      boolean forceDelete, boolean isUpdateStatusFile) {
-    boolean isDeleted = false;
-    Long fileTimestamp;
-
-    if (isUpdateStatusFile) {
-      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
-              .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
-    } else {
-      fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
-              CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
-    }
-
-    // This check is because, when there are some invalid files like tableStatusUpdate.write files
-    // present in store [[which can happen during delete or update if the disk is full or hdfs quota
-    // is finished]] then fileTimestamp will be null, in that case check for max query out and
-    // delete the .write file after timeout
-    if (fileTimestamp == null) {
-      String tableUpdateStatusFilename = invalidFile.getName();
-      if (tableUpdateStatusFilename.endsWith(".write")) {
-        long tableUpdateStatusFileTimeStamp = Long.parseLong(
-            CarbonTablePath.DataFileUtil.getTimeStampFromFileName(tableUpdateStatusFilename));
-        if (isMaxQueryTimeoutExceeded(tableUpdateStatusFileTimeStamp)) {
-          isDeleted = deleteInvalidFiles(invalidFile);
-        }
-      }
-    } else {
-      // if the timestamp of the file is more than the current time by query execution timeout.
-      // then delete that file.
-      if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
-        isDeleted = deleteInvalidFiles(invalidFile);
-      }
-    }
-    return isDeleted;
-  }
-
   private static boolean deleteInvalidFiles(CarbonFile invalidFile) {
     boolean isDeleted;
     try {
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index b8ddf0b..d9c3910 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -171,7 +171,14 @@ public class LoadMetadataDetails implements Serializable {
    */
   public long getModificationOrDeletionTimestamp() {
     if (null == modificationOrDeletionTimestamp) {
-      return 0;
+      // 1. try to get last modified time
+      long lastModification = getLastModifiedTime();
+      if (lastModification == -1) {
+        // 2. get load start time
+        return getLoadStartTime();
+      } else {
+        return lastModification;
+      }
     }
     return convertTimeStampToLong(modificationOrDeletionTimestamp);
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 565a00b..a2cc314 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -2123,29 +2123,32 @@ public final class CarbonProperties {
    * folder will take place
    */
   private void validateTrashFolderRetentionTime() {
-    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
     try {
-      int configuredValue = Integer.parseInt(propertyValue);
-      if (configuredValue < 0 || configuredValue > CarbonCommonConstants
-          .CARBON_TRASH_RETENTION_DAYS_MAXIMUM) {
+      int configuredValue = getTrashFolderRetentionTime();
+      if (configuredValue < 0 ||
+          configuredValue > CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_MAXIMUM) {
         LOGGER.warn("Value of " + CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS + " is" +
             " invalid, taking default value instead");
-        carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer
-            .toString(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+        carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS,
+            CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT);
       } else {
         carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS,
-            propertyValue);
+            configuredValue + "");
       }
     } catch (NumberFormatException e) {
       LOGGER.error("Invalid value configured for " + CarbonCommonConstants
           .CARBON_TRASH_RETENTION_DAYS + ", considering the default value");
-      carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer
-          .toString(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS,
+          CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT);
     }
   }
 
+  public int getTrashFolderRetentionTime() {
+    return Integer.parseInt(carbonProperties.getProperty(
+        CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS,
+        CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+  }
+
   /**
    * Check if the user has allowed the use of clean files command with force option.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index bdfab6b..3a817a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -22,6 +22,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -54,34 +55,36 @@ public class CleanFilesUtil {
     List<String> redundantSegmentFile = new ArrayList<>();
     getStaleSegmentFiles(carbonTable, staleSegmentFiles, redundantSegmentFile);
     for (String staleSegmentFile : staleSegmentFiles) {
-      String segmentNumber = DataFileUtil.getSegmentNoFromSegmentFile(staleSegmentFile);
       SegmentFileStore fileStore = new SegmentFileStore(carbonTable.getTablePath(),
           staleSegmentFile);
-      Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getSegmentFile()
-          .getLocationMap();
-      if (locationMap != null) {
-        if (locationMap.entrySet().iterator().next().getValue().isRelative()) {
-          CarbonFile segmentPath = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(
-              carbonTable.getTablePath(), segmentNumber));
-          // copy the complete segment to the trash folder
-          TrashUtil.copySegmentToTrash(segmentPath, TrashUtil.getCompleteTrashFolderPath(
-              carbonTable.getTablePath(), timeStampForTrashFolder, segmentNumber));
-          // Deleting the stale Segment folders and the segment file.
-          try {
-            CarbonUtil.deleteFoldersAndFiles(segmentPath);
-            // delete the segment file as well
-            FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
-                staleSegmentFile));
-            for (String duplicateStaleSegmentFile : redundantSegmentFile) {
-              if (DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
-                  .equals(segmentNumber)) {
-                FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable
-                    .getTablePath(), duplicateStaleSegmentFile));
+      SegmentFileStore.SegmentFile segmentFile = fileStore.getSegmentFile();
+      if (segmentFile != null) {
+        Map<String, SegmentFileStore.FolderDetails> locationMap = segmentFile.getLocationMap();
+        if (locationMap != null) {
+          if (locationMap.entrySet().iterator().next().getValue().isRelative()) {
+            String segmentNumber = DataFileUtil.getSegmentNoFromSegmentFile(staleSegmentFile);
+            CarbonFile segmentPath = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(
+                carbonTable.getTablePath(), segmentNumber));
+            // copy the complete segment to the trash folder
+            TrashUtil.copySegmentToTrash(segmentPath, TrashUtil.getCompleteTrashFolderPath(
+                carbonTable.getTablePath(), timeStampForTrashFolder, segmentNumber));
+            // Deleting the stale Segment folders and the segment file.
+            try {
+              CarbonUtil.deleteFoldersAndFiles(segmentPath);
+              // delete the segment file as well
+              FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath(),
+                  staleSegmentFile));
+              for (String duplicateStaleSegmentFile : redundantSegmentFile) {
+                if (DataFileUtil.getSegmentNoFromSegmentFile(duplicateStaleSegmentFile)
+                    .equals(segmentNumber)) {
+                  FileFactory.deleteFile(CarbonTablePath.getSegmentFilePath(carbonTable
+                      .getTablePath(), duplicateStaleSegmentFile));
+                }
               }
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error("Unable to delete the segment: " + segmentPath + " from after moving" +
+                  " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
             }
-          } catch (IOException | InterruptedException e) {
-            LOGGER.error("Unable to delete the segment: " + segmentPath + " from after moving" +
-                " it to the trash folder. Please delete them manually : " + e.getMessage(), e);
           }
         }
       }
@@ -163,8 +166,13 @@ public class CleanFilesUtil {
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments.
+    // during compaction, we don't add in-progress metadata entry into tablestatus file,
+    // so here we don't known whether compaction segment is stale or not.
+    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> {
+      String segmentNo = DataFileUtil.getSegmentNoFromSegmentFile(segmentFile);
+      return !loadNameSet.contains(segmentNo) && !segmentNo.contains(CarbonCommonConstants.POINT);
+    }).collect(Collectors.toList());
     if (staleSegments.size() == 0) {
       return;
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index e1b4663..460b4e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
@@ -210,32 +209,18 @@ public final class DeleteLoadFolders {
      * if cleanStaleInProgress == true and  isForceDelete == true, clean MFD, Compacted and
      *  stale inprogress segments immediately.(Do not check for any timeout)
      */
-    if (isForceDelete) {
-      // immediately delete compacted and MFD
-      if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
-          .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
-        return true;
-      }
-      // immediately delete inprogress segments if cleanstaleinprogress is true
-      return cleanStaleInProgress && (SegmentStatus.INSERT_IN_PROGRESS == oneLoad
-          .getSegmentStatus() || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad
-          .getSegmentStatus());
-    }
-    long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
-    // in case there is no deletion or modification timestamp, take the load start time as
-    // deleteTime
-    if (deletionTime == 0) {
-      deletionTime = oneLoad.getLoadStartTime();
-    }
-    if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
-        .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
-      // delete MFD, compacted segments after checking trash timeout and query timeout
-      return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) && CarbonUpdateUtil
-        .isMaxQueryTimeoutExceeded(deletionTime);
+    boolean canDelete = isForceDelete || TrashUtil.isDataOutsideTrashIsExpired(
+        oneLoad.getModificationOrDeletionTimestamp());
+    switch (oneLoad.getSegmentStatus()) {
+      case COMPACTED:
+      case MARKED_FOR_DELETE:
+        return canDelete;
+      case INSERT_IN_PROGRESS:
+      case INSERT_OVERWRITE_IN_PROGRESS:
+        return canDelete && cleanStaleInProgress;
+      default:
+        return false;
     }
-    return (SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() || SegmentStatus
-        .INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) && cleanStaleInProgress &&
-        TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime);
   }
 
   private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index abbb3c2..c9db279 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -192,20 +192,20 @@ public final class TrashUtil {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * check If the fileTimestamp is expired based on `carbon.trash.retention.days`
+   */
+  private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+    int retentionDays = CarbonProperties.getInstance().getTrashFolderRetentionTime();
+    long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays;
+    return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > retentionMilliSeconds;
+  }
+
+  /**
+   * whether trash data outside of .Trash folder is time out
    */
-  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
-    // record current time.
-    long currentTime = CarbonUpdateUtil.readCurrentTime();
-    long retentionMilliSeconds = (long)Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer.toString(
-          CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * TimeUnit.DAYS
-        .toMillis(1);
-    long difference = currentTime - fileTimestamp;
-    return difference > retentionMilliSeconds;
+  public static boolean isDataOutsideTrashIsExpired(long fileTimestamp) {
+    return isTrashRetentionTimeoutExceeded(fileTimestamp) &&
+        CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 40bf3f8..458ebdf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -796,7 +796,7 @@ public class CarbonTablePath {
     }
   }
 
-  public static String getTrashFolderPath(String carbonTablePath) {
-    return carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + TRASH_DIR;
+  public static String getTrashFolderPath(String tablePath) {
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + TRASH_DIR;
   }
 }
diff --git a/docs/clean-files.md b/docs/clean-files.md
index db4885b..baf81db 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -38,6 +38,9 @@ The above clean files command will clean Marked For Delete and Compacted segment
    ``` 
   Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command.
 
+**NOTE**:
+  * In trash folder, the retention time is "carbon.trash.retention.days"
+  * Outside trash folder(Segment Directories in table path), the retention time is Max("carbon.trash.retention.days", "max.query.execution.time")
 ### FORCE OPTION
 The force option with clean files command deletes all the files and folders from the trash folder and delete the Marked for Delete and Compacted segments immediately. Since Clean Files operation with force option will delete data that can never be recovered, the force option by default is disabled. Clean files with force option is only allowed when the carbon property ```carbon.clean.file.force.allowed``` is set to true. The default value of this property is false.
                                                                                                                                                                        
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 160cdd1..84cc3ad 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -35,13 +35,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -287,141 +283,6 @@ object CarbonStore {
     (dataSize, indexSize)
   }
 
-  /**
-   * The method deletes all data if forceTableCLean <true> and lean garbage segment
-   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
-   *
-   * @param dbName          : Database name
-   * @param tableName       : Table name
-   * @param tablePath       : Table path
-   * @param carbonTable     : CarbonTable Object <null> in case of force clean
-   * @param forceTableClean : <true> for force clean it will delete all data
-   *                        <false> it will clean garbage segment (MARKED_FOR_DELETE state)
-   * @param currentTablePartitions : Hive Partitions  details
-   */
-  def cleanFiles(
-      dbName: String,
-      tableName: String,
-      tablePath: String,
-      carbonTable: CarbonTable,
-      forceTableClean: Boolean,
-      isForceDelete: Boolean,
-      cleanStaleInprogress: Boolean,
-      currentTablePartitions: Option[Seq[PartitionSpec]] = None,
-      truncateTable: Boolean = false): Unit = {
-    // CleanFiles API is also exposed to the user, if they call this API with isForceDelete = true,
-    // need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
-    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
-      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
-          " recovered. It is disabled by default, to enable clean files with force option," +
-          " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
-      throw new RuntimeException("Clean files with force operation not permitted by default")
-    }
-    var carbonCleanFilesLock: ICarbonLock = null
-    val absoluteTableIdentifier = if (forceTableClean) {
-      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
-    } else {
-      carbonTable.getAbsoluteTableIdentifier
-    }
-    try {
-      val errorMsg = "Clean files request is failed for " +
-                     s"$dbName.$tableName" +
-                     ". Not able to acquire the clean files lock due to another clean files " +
-                     "operation is running in the background."
-      // in case of force clean the lock is not required
-      if (forceTableClean) {
-        FileFactory.deleteAllCarbonFilesOfDir(
-          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
-      } else {
-        carbonCleanFilesLock =
-          CarbonLockUtil
-            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
-        if (truncateTable) {
-          SegmentStatusManager.truncateTable(carbonTable)
-        }
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
-          isForceDelete, currentTablePartitions.map(_.asJava).orNull, cleanStaleInprogress, true)
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
-        currentTablePartitions match {
-          case Some(partitions) =>
-            SegmentFileStore.cleanSegments(
-              carbonTable,
-              currentTablePartitions.map(_.asJava).orNull,
-              true)
-          case _ =>
-        }
-      }
-    } finally {
-      if (currentTablePartitions.equals(None)) {
-        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
-      } else {
-        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
-      }
-
-      if (carbonCleanFilesLock != null) {
-        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
-      }
-    }
-  }
-
-  /**
-   * delete partition folders recursively
-   *
-   * @param carbonTable
-   * @param partitionSpecList
-   */
-  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
-      partitionSpecList: List[PartitionSpec]): Unit = {
-    if (carbonTable != null && carbonTable.isHivePartitionTable) {
-      val loadMetadataDetails = SegmentStatusManager
-        .readLoadMetadata(carbonTable.getMetadataPath)
-
-      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
-
-      // list all files from table path
-      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
-      loadMetadataDetails.foreach { metadataDetail =>
-        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
-            metadataDetail.getSegmentFile == null) {
-          val loadStartTime: Long = metadataDetail.getLoadStartTime
-          // delete all files of @loadStartTime from table path
-          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
-          partitionSpecList.foreach {
-            partitionSpec =>
-              val partitionLocation = partitionSpec.getLocation
-              // For partition folder outside the tablePath
-              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
-                val partitionCarbonFile = FileFactory
-                  .getCarbonFile(partitionLocation.toString)
-                // list all files from partitionLocation
-                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
-                // delete all files of @loadStartTime from externalPath
-                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
-              }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   *
-   * @param carbonFiles
-   * @param timestamp
-   */
-  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
-      timestamp: Long): Unit = {
-    carbonFiles.asScala.foreach {
-      carbonFile =>
-        val filePath = carbonFile.getPath
-        val fileName = carbonFile.getName
-        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
-          // delete the file
-          FileFactory.deleteFile(filePath)
-        }
-    }
-  }
-
   // validates load ids
   private def validateLoadIds(loadIds: Seq[String]): Unit = {
     if (loadIds.isEmpty) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
index 509e166..44add8b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
@@ -20,20 +20,11 @@ import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
-/**
- *
- * @param carbonTable
- * @param sparkSession
- */
 case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
   extends Event with CleanFilesEventInfo
 
-
-/**
- *
- * @param carbonTable
- * @param sparkSession
- */
-case class CleanFilesPostEvent(carbonTable: CarbonTable, cleanStaleInProgress: Boolean,
-    ifForceDelete: Boolean, sparkSession: SparkSession)
+case class CleanFilesPostEvent(
+    carbonTable: CarbonTable,
+    sparkSession: SparkSession,
+    options: Map[String, String])
   extends Event with CleanFilesEventInfo
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
similarity index 58%
copy from integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
copy to integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
index 509e166..f4b4caf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
@@ -14,26 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.carbondata.events
 
-import org.apache.spark.sql.SparkSession
+package org.apache.carbondata
 
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-
-/**
- *
- * @param carbonTable
- * @param sparkSession
- */
-case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
-  extends Event with CleanFilesEventInfo
-
-
-/**
- *
- * @param carbonTable
- * @param sparkSession
- */
-case class CleanFilesPostEvent(carbonTable: CarbonTable, cleanStaleInProgress: Boolean,
-    ifForceDelete: Boolean, sparkSession: SparkSession)
-  extends Event with CleanFilesEventInfo
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {
+    val operationContext = new OperationContext
+    OperationListenerBus.getInstance.fireEvent(preEvent, operationContext)
+    func
+    OperationListenerBus.getInstance.fireEvent(postEvent, operationContext)
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index f4de2ea..31d249c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -75,12 +75,12 @@ class updateResultImpl
   }
 }
 
-trait DeleteDelateResult[K, V] extends Serializable {
+trait DeleteDeltaResult[K, V] extends Serializable {
   def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors, Long)): (K, V)
 }
 
 class DeleteDeltaResultImpl
-  extends DeleteDelateResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)] {
+  extends DeleteDeltaResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long)] {
   override def getKey(key: SegmentStatus,
       value: (SegmentUpdateDetails, ExecutionErrors, Long)): (SegmentStatus, (SegmentUpdateDetails,
     ExecutionErrors, Long)) = {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0b34f81..09059b1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -19,17 +19,14 @@ package org.apache.carbondata.spark.rdd
 
 import java.io.File
 import java.util
-import java.util.concurrent._
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
-import scala.util.Random
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, DataLoadWrapperRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -45,7 +42,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
@@ -54,16 +50,14 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonUtil, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.view.{MVSchema, MVStatus}
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.BlockDetails
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
-import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
@@ -82,7 +76,6 @@ object CarbonDataRDDFactory {
 
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storeLocation: String,
       compactionType: CompactionType,
       carbonTable: CarbonTable,
       compactedSegments: java.util.List[String],
@@ -106,7 +99,6 @@ object CarbonDataRDDFactory {
         startCompactionThreads(
           sqlContext,
           carbonLoadModel,
-          storeLocation,
           compactionModel,
           lock,
           compactedSegments,
@@ -142,12 +134,10 @@ object CarbonDataRDDFactory {
 
   def startCompactionThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storeLocation: String,
       compactionModel: CompactionModel,
       compactionLock: ICarbonLock,
       compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
-    val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     carbonLoadModel.readAndSetLoadMetadataDetails()
 
@@ -156,9 +146,7 @@ object CarbonDataRDDFactory {
         val compactor = CompactionFactory.getCompactor(
           carbonLoadModel,
           compactionModel,
-          executor,
           sqlContext,
-          storeLocation,
           compactedSegments,
           operationContext)
         try {
@@ -212,9 +200,7 @@ object CarbonDataRDDFactory {
                 CompactionFactory.getCompactor(
                   newCarbonLoadModel,
                   newCompactionModel,
-                  executor,
                   sqlContext,
-                  storeLocation,
                   compactedSegments,
                   operationContext).executeCompaction()
               } catch {
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()
-          } catch {
-            // no need to throw this as compaction is over
-            case ex: Exception =>
-          } finally {
-            compactionLock.unlock()
-          }
+          compactionLock.unlock()
         }
       }
     }
@@ -461,6 +439,7 @@ object CarbonDataRDDFactory {
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
     }
+    var isLoadingCommitted = false
     try {
       val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
         .asInstanceOf[String]
@@ -470,7 +449,7 @@ object CarbonDataRDDFactory {
         LOGGER.info("********starting clean up**********")
         if (carbonLoadModel.isCarbonTransactionalTable) {
           // delete segment is applicable for transactional table
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+          CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
           clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
@@ -487,7 +466,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********starting clean up**********")
           if (carbonLoadModel.isCarbonTransactionalTable) {
             // delete segment is applicable for transactional table
-            CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+            CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
             clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
           }
           LOGGER.info("********clean up done**********")
@@ -558,7 +537,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********starting clean up**********")
           if (carbonLoadModel.isCarbonTransactionalTable) {
             // delete segment is applicable for transactional table
-            CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+            CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
             // delete corresponding segment file from metadata
             val segmentFile =
               CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
+        operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
+    }
+    try {
+      // compaction handling
+      if (carbonTable.isHivePartitionTable) {
+        carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+      }
+      val compactedSegments = new util.ArrayList[String]()
+      handleSegmentMerging(sqlContext,
+        carbonLoadModel
+          .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
+        carbonTable,
+        compactedSegments,
+        operationContext)
+      carbonLoadModel.setMergedSegmentIds(compactedSegments)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(
+          "Auto-Compaction has failed. Ignoring this exception because the load is passed.", e)
     }
   }
 
@@ -650,15 +637,6 @@ object CarbonDataRDDFactory {
         CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
           TableIdentifier(carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))), None)
-      var storeLocation = ""
-      val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != configuredStore && configuredStore.nonEmpty) {
-        storeLocation = configuredStore(Random.nextInt(configuredStore.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
       val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
@@ -668,7 +646,6 @@ object CarbonDataRDDFactory {
       if (!isConcurrentCompactionAllowed) {
         handleCompactionForSystemLocking(sqlContext,
           carbonLoadModel,
-          storeLocation,
           CompactionType.MINOR,
           carbonTable,
           compactedSegments,
@@ -687,7 +664,6 @@ object CarbonDataRDDFactory {
               LOGGER.info("Acquired the compaction lock.")
               startCompactionThreads(sqlContext,
                 carbonLoadModel,
-                storeLocation,
                 compactionModel,
                 lock,
                 compactedSegments,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 30b59f5..9e1369b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -53,19 +53,19 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
 import org.apache.carbondata.spark.MergeResultImpl
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
 import org.apache.carbondata.spark.util.CarbonSparkUtil
+import org.apache.carbondata.trash.DataTrashManager
 import org.apache.carbondata.view.MVManagerInSpark
 
 /**
  * This class is used to perform compaction on carbon table.
  */
-class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
+class CarbonTableCompactor(
+    carbonLoadModel: CarbonLoadModel,
     compactionModel: CompactionModel,
-    executor: ExecutorService,
     sqlContext: SQLContext,
-    storeLocation: String,
     compactedSegments: List[String],
     operationContext: OperationContext)
-  extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+  extends Compactor(carbonLoadModel, compactionModel) {
 
   private def needSortSingleSegment(
       loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
@@ -90,7 +90,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
-      deletePartialLoadsInCompaction()
       val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
       var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
       loadsToMerge.asScala.foreach { segmentId =>
@@ -98,7 +97,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           .getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
             .getAbsoluteTableIdentifier,
             CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK)
-        segmentLock.lockWithRetries()
+        if (!segmentLock.lockWithRetries()) {
+          throw new Exception(s"Failed to acquire lock on segment ${segmentId.getLoadName}," +
+            s" during compaction of table ${compactionModel.carbonTable.getQualifiedName}")
+        }
         segmentLocks += segmentLock
       }
       try {
@@ -174,7 +176,18 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       compactionModel.compactionType,
       compactionModel.currentPartitions,
       compactedSegments)
-    triggerCompaction(compactionCallableModel, mergedLoadName: String)
+    try {
+      triggerCompaction(compactionCallableModel, mergedLoadName: String)
+    } catch {
+      case e: Throwable =>
+        // clean stale compaction segment immediately after compaction failure
+        DataTrashManager.cleanStaleCompactionSegment(
+          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+          mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1),
+          carbonLoadModel.getFactTimeStamp,
+          compactionCallableModel.compactedPartitions)
+        throw e
+    }
   }
 
   private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
@@ -231,6 +244,24 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       .sparkContext
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
 
+    var mergeRDD: CarbonMergerRDD[String, Boolean] = null
+    if (carbonTable.isHivePartitionTable) {
+      // collect related partitions
+      mergeRDD = new CarbonMergerRDD(
+        sc.sparkSession,
+        new MergeResultImpl(),
+        carbonLoadModel,
+        carbonMergerMapping,
+        segmentMetaDataAccumulator
+      )
+      val partitionSpecs = mergeRDD.getPartitions.map { partition =>
+        partition.asInstanceOf[CarbonSparkPartition].partitionSpec.get
+      }
+      if (partitionSpecs != null && partitionSpecs.nonEmpty) {
+        compactionCallableModel.compactedPartitions = Some(partitionSpecs)
+      }
+    }
+
     val mergeStatus =
       if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
           !carbonTable.getSortColumns.isEmpty &&
@@ -241,13 +272,17 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           carbonMergerMapping,
           segmentMetaDataAccumulator)
       } else {
-        new CarbonMergerRDD(
-          sc.sparkSession,
-          new MergeResultImpl(),
-          carbonLoadModel,
-          carbonMergerMapping,
-          segmentMetaDataAccumulator
-        ).collect
+        if (mergeRDD != null) {
+          mergeRDD.collect
+        } else {
+          new CarbonMergerRDD(
+            sc.sparkSession,
+            new MergeResultImpl(),
+            carbonLoadModel,
+            carbonMergerMapping,
+            segmentMetaDataAccumulator
+          ).collect
+        }
       }
 
     if (mergeStatus.length == 0) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
index dc0deb9..0b126a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -32,17 +32,13 @@ object CompactionFactory {
    */
   def getCompactor(carbonLoadModel: CarbonLoadModel,
       compactionModel: CompactionModel,
-      executor: ExecutorService,
       sqlContext: SQLContext,
-      storeLocation: String,
       mergedLoads: java.util.List[String],
       operationContext: OperationContext): Compactor = {
     new CarbonTableCompactor(
       carbonLoadModel,
       compactionModel,
-      executor,
       sqlContext,
-      storeLocation,
       mergedLoads,
       operationContext)
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 425e9de..fadf938 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -18,24 +18,17 @@
 package org.apache.carbondata.spark.rdd
 
 import java.util
-import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.CompactionModel
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 
-abstract class Compactor(carbonLoadModel: CarbonLoadModel,
-    compactionModel: CompactionModel,
-    executor: ExecutorService,
-    sqlContext: SQLContext,
-    storeLocation: String) {
+abstract class Compactor(carbonLoadModel: CarbonLoadModel, compactionModel: CompactionModel) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -55,20 +48,4 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
         compactionModel.compactionType,
         customSegmentIds)
   }
-
-  def deletePartialLoadsInCompaction(): Unit = {
-    // Deleting the any partially loaded data if present.
-    // in some case the segment folder which is present in store will not have entry in
-    // status.
-    // so deleting those folders.
-    try {
-      TableProcessingOperations
-        .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, true)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
-    }
-  }
-
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 3da5d10..bb73d4a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -332,7 +332,7 @@ object StreamHandoffRDD {
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
       CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
       LOGGER.info("********starting clean up**********")
-      CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+      CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel)
       LOGGER.info("********clean up done**********")
       LOGGER.error("Cannot write load metadata file as handoff failed")
       throw new Exception(errorMessage)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index b355c07..20e1f56 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -566,54 +566,6 @@ object CommonUtil {
     ThreadLocalTaskInfo.clearCarbonTaskInfo()
   }
 
-  /**
-   * The in-progress segments which are in stale state will be marked as deleted
-   * when driver is initializing.
-   * @param databaseLocation
-   * @param dbName
-   */
-  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
-    val loaderDriver = CarbonProperties.getInstance().
-      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
-        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
-    if (!loaderDriver) {
-      return
-    }
-    try {
-      if (FileFactory.isFileExist(databaseLocation)) {
-        val file = FileFactory.getCarbonFile(databaseLocation)
-        if (file.isDirectory) {
-          val tableFolders = file.listFiles()
-          tableFolders.foreach { tableFolder =>
-            if (tableFolder.isDirectory) {
-              val tablePath = databaseLocation +
-                              CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
-              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
-              val tableStatusFile =
-                CarbonTablePath.getTableStatusFilePath(tablePath)
-              if (FileFactory.isFileExist(tableStatusFile)) {
-                try {
-                  val carbonTable = CarbonMetadata.getInstance
-                    .getCarbonTable(tableUniqueName)
-                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, false, null,
-                    true, true)
-                } catch {
-                  case _: Exception =>
-                    LOGGER.warn(s"Error while cleaning table " +
-                                s"${ tableUniqueName }")
-                }
-              }
-            }
-          }
-        }
-      }
-    } catch {
-      case s: java.io.FileNotFoundException =>
-        // Create folders and files.
-        LOGGER.error(s)
-    }
-  }
-
   def getScaleAndPrecision(dataType: String): (Int, Int) = {
     val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
     m.find()
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
new file mode 100644
index 0000000..75c4751
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      checkAndCleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include stale compaction (x.y) segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def checkAndCleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedSegmentId: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedSegmentId.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      if (carbonTable.isHivePartitionTable && partitionSpecs.isDefined) {
+        partitionSpecs.get.foreach { partitionSpec =>
+          cleanStaleCompactionDataFiles(
+            partitionSpec.getLocation.toString, mergedSegmentId, factTimestamp)
+        }
+      } else {
+        cleanStaleCompactionDataFiles(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, mergedSegmentId),
+          mergedSegmentId,
+          factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDeleted = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.contains(namePart)
+        }
+      })
+      if (toBeDeleted != null && toBeDeleted.nonEmpty) {
+        try {
+          CarbonUtil.deleteFoldersAndFilesSilent(toBeDeleted: _*)
+        } catch {
+          case e: Throwable =>
+            LOGGER.error(
+              s"Failed to clean stale data under folder $folderPath, match filter: $namePart", e)
+        }
+      }
+    }
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index 70b766b..152fbb4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -65,8 +65,6 @@ object MVRefresher {
       viewSchema.getIdentifier.getTableId)
     val viewIdentifier = viewSchema.getIdentifier
     val viewTableIdentifier = viewTable.getAbsoluteTableIdentifier
-    // Clean up the old invalid segment data before creating a new entry for new load.
-    SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null, false, false)
     val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(viewTableIdentifier)
     // Acquire table status lock to handle concurrent data loading
     val lock: ICarbonLock = segmentStatusManager.getTableStatusLock
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c02911d..22b1989 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -136,7 +136,8 @@ case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     sqlContext: SQLContext,
     compactionType: CompactionType,
     currentPartitions: Option[Seq[PartitionSpec]],
-    compactedSegments: java.util.List[String])
+    compactedSegments: java.util.List[String],
+    var compactedPartitions: Option[Seq[PartitionSpec]] = None)
 
 case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
     segmentId: String,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
index 45a93bc..f73ce37 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
@@ -58,6 +58,8 @@ private[sql] case class DropIndexCommand(
     if (!parentTable.getIndexTableNames().contains(indexName)) {
       if (!ifExistsSet) {
         throw new MalformedIndexCommandException("Index with name " + indexName + " does not exist")
+      } else {
+      return Seq.empty
       }
     }
     if (parentTable.getIndexTableNames(IndexType.SI.getIndexProviderName)
@@ -206,19 +208,13 @@ private[sql] case class DropIndexCommand(
           logInfo("Table MetaData Unlocked Successfully")
           if (isValidDeletion) {
             if (carbonTable != null && carbonTable.isDefined) {
-              CarbonInternalMetastore.deleteTableDirectory(dbName, indexName, sparkSession)
+              CarbonInternalMetastore.deleteTableDirectory(carbonTable.get)
             }
           }
         } else {
           logError("Table metadata unlocking is unsuccessful, index table may be in stale state")
         }
       }
-      // in case if the the physical folders still exists for the index table
-      // but the carbon and hive info for the index table is removed,
-      // DROP INDEX IF EXISTS should clean up those physical directories
-      if (ifExistsSet && carbonTable.isEmpty) {
-        CarbonInternalMetastore.deleteTableDirectory(dbName, indexName, sparkSession)
-      }
     }
     Seq.empty
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 5fc16d5..7dd0660 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -324,7 +324,7 @@ case class CarbonAddLoadCommand(
       CarbonLoaderUtil.updateTableStatusForFailure(model, "uniqueTableStatusId")
       LOGGER.info("********starting clean up**********")
       // delete segment is applicable for transactional table
-      CarbonLoaderUtil.deleteSegment(model, model.getSegmentId.toInt)
+      CarbonLoaderUtil.deleteSegmentForFailure(model)
       // delete corresponding segment file from metadata
       val segmentFile = CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
                         File.separator + segment.getSegmentFileName
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 6ab65d3..a09f929 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -287,7 +287,6 @@ case class CarbonAlterTableCompactionCommand(
       CarbonDataRDDFactory.handleCompactionForSystemLocking(
         sqlContext,
         carbonLoadModel,
-        storeLocation,
         compactionType,
         carbonTable,
         compactedSegments,
@@ -319,7 +318,6 @@ case class CarbonAlterTableCompactionCommand(
         CarbonDataRDDFactory.startCompactionThreads(
           sqlContext,
           carbonLoadModel,
-          storeLocation,
           compactionModel,
           lock,
           compactedSegments,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c0ca47a..3f956e2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,178 +17,50 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
-import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CleanFilesUtil, TrashUtil}
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.view.MVManagerInSpark
+import org.apache.carbondata.trash.DataTrashManager
 
 /**
- * Clean data in table
- * If table name is specified and forceTableClean is false, it will clean garbage
- * segment (MARKED_FOR_DELETE state).
- * If table name is specified and forceTableClean is true, it will delete all data
- * in the table.
- * If table name is not provided, it will clean garbage segment in all tables.
+ * Clean garbage data in table, it invokes TrashDataManager.cleanGarbageData to implement it.
  */
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
-    tableName: Option[String],
-    options: Option[List[(String, String)]],
-    forceTableClean: Boolean = false,
-    isInternalCleanCall: Boolean = false,
-    truncateTable: Boolean = false)
-  extends AtomicRunnableCommand {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-  var carbonTable: CarbonTable = _
-  var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
-  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
-  // forceClean will clean the MFD and Compacted segments immediately and also empty the trash
-  // folder
-  val forceClean = optionsMap.getOrElse("force", "false").toBoolean
-  // stale_inprogress will clean the In Progress segments based on retention time and it will
-  // clean immediately when force is true
-  val staleInprogress = optionsMap.getOrElse("stale_inprogress", "false").toBoolean
-
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
-    setAuditTable(carbonTable)
-    setAuditInfo(Map(
-      "force" -> forceTableClean.toString,
-      "internal" -> isInternalCleanCall.toString))
-    val viewSchemas =
-      MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
-    if (!viewSchemas.isEmpty) {
-      val commands = viewSchemas.asScala.map {
-        schema =>
-          val relationIdentifier = schema.getIdentifier
-          CarbonCleanFilesCommand(
-            Some(relationIdentifier.getDatabaseName),
-            Some(relationIdentifier.getTableName),
-            options,
-            isInternalCleanCall = true)
-      }.toList
-      commands.foreach(_.processMetadata(sparkSession))
-      cleanFileCommands = cleanFileCommands ++ commands
-    }
-    Seq.empty
-  }
+    tableName: String,
+    options: Map[String, String] = Map.empty,
+    isInternalCleanCall: Boolean = false)
+  extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    setAuditTable(carbonTable)
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
     }
-    val operationContext = new OperationContext
-    val cleanFilesPreEvent: CleanFilesPreEvent =
-      CleanFilesPreEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
-    if (tableName.isDefined) {
-      Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
-      if (forceClean) {
-        // empty the trash folder
-        if (CarbonProperties.getInstance().isCleanFilesForceAllowed) {
-          TrashUtil.emptyTrash(carbonTable.getTablePath)
-        } else {
-          LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
-              " recovered. It is disabled by default, to enable clean files with force option," +
-              " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
-            throw new RuntimeException("Clean files with force operation not permitted by default")
-        }
-      } else {
-        // clear trash based on timestamp
-        TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
-      }
-      if (carbonTable.isHivePartitionTable) {
-        CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
-      } else {
-        CleanFilesUtil.cleanStaleSegments(carbonTable)
-      }
-      if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get, forceClean, staleInprogress)
-      } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get, forceClean, staleInprogress)
-      }
-    } else {
-      cleanGarbageDataInAllTables(sparkSession)
-    }
-    if (cleanFileCommands != null) {
-      cleanFileCommands.foreach(_.processData(sparkSession))
-    }
-    val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable, staleInprogress, forceClean, sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
-    Seq.empty
-  }
-
-  private def deleteAllData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String, isForceDelete: Boolean,
-      cleanStaleInprogress: Boolean): Unit = {
-    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
-    val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    CarbonStore.cleanFiles(
-      dbName = dbName,
-      tableName = tableName,
-      tablePath = tablePath,
-      carbonTable = null, // in case of delete all data carbonTable is not required.
-      forceTableClean = forceTableClean,
-      isForceDelete = isForceDelete,
-      cleanStaleInprogress = cleanStaleInprogress)
-  }
-
-  private def cleanGarbageData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String, isForceDelete: Boolean,
-      cleanStaleInprogress: Boolean): Unit = {
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
-    val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-      Seq.empty[Expression],
-      sparkSession,
-      carbonTable)
-    CarbonStore.cleanFiles(
-      dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
-      tableName = tableName,
-      tablePath = carbonTable.getTablePath,
-      carbonTable = carbonTable,
-      forceTableClean = forceTableClean,
-      isForceDelete = isForceDelete,
-      cleanStaleInprogress = cleanStaleInprogress,
-      currentTablePartitions = partitions,
-      truncateTable = truncateTable)
-  }
 
-  // Clean garbage data in all tables in all databases
-  private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
-    try {
-      val databases = sparkSession.sessionState.catalog.listDatabases()
-      databases.foreach(dbName => {
-        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
-        CommonUtil.cleanInProgressSegments(databaseLocation, dbName)
-      })
-    } catch {
-      case e: Throwable =>
-        // catch all exceptions to avoid failure
-        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-          .error("Failed to clean in progress segments", e)
+    val preEvent = CleanFilesPreEvent(carbonTable, sparkSession)
+    val postEvent = CleanFilesPostEvent(carbonTable, sparkSession, options)
+    withEvents(preEvent, postEvent) {
+      DataTrashManager.cleanGarbageData(
+        carbonTable,
+        options.getOrElse("force", "false").toBoolean,
+        options.getOrElse("stale_inprogress", "false").toBoolean,
+        CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, carbonTable))
     }
+
+    Seq.empty
   }
 
   override protected def opName: String = "CLEAN FILES"
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 353f49c..80c41ba 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -184,10 +184,6 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
           getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
       }
     }
-    // Delete stale segment folders that are not in table status but are physically present in
-    // the Fact folder
-    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
     var isUpdateTableStatusRequired = false
     val uuid = ""
     try {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
index b114191..8be3555 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
@@ -87,10 +87,6 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
       options = options)
     val (timeStampFormat, dateFormat) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(
       carbonLoadModel)
-    // Delete stale segment folders that are not in table status but are physically present in
-    // the Fact folder
-    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
     var isUpdateTableStatusRequired = false
     val uuid = ""
     try {
@@ -107,8 +103,6 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
           updateModel = updateModel,
           operationContext = operationContext)
 
-      // Clean up the old invalid segment data before creating a new entry for new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions, false, false)
       // add the start entry for the new load in the table status file
       if ((updateModel.isEmpty || updateModel.isDefined)
           && !table.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 65641b3..299b17b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -25,10 +25,10 @@ import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping}
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
-import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
-import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils, SparkUtil}
+import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -40,10 +40,9 @@ import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 
 case class CarbonLoadDataCommand(databaseNameOp: Option[String],
@@ -105,10 +104,6 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
     val (tf, df) = CommonLoadUtils.getTimeAndDateFormatFromLoadModel(carbonLoadModel)
     timeStampFormat = tf
     dateFormat = df
-    // Delete stale segment folders that are not in table status but are physically present in
-    // the Fact folder
-    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
     var isUpdateTableStatusRequired = false
     val uuid = ""
     try {
@@ -124,8 +119,6 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
           isDataFrame = false,
           updateModel = None,
           operationContext = operationContext)
-      // Clean up the old invalid segment data before creating a new entry for new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions, false, false)
       // add the start entry for the new load in the table status file
       if (!table.isHivePartitionTable) {
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
index ab481f8..0bd36e7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
@@ -18,39 +18,34 @@
 package org.apache.spark.sql.execution.command.mutation
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.execution.command.{DataCommand, TruncateTableCommand}
-import org.apache.spark.sql.execution.command.management.CarbonCleanFilesCommand
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.command.{Checker, DataCommand, TruncateTableCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.trash.DataTrashManager
 
 case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataCommand {
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = CarbonEnv.getDatabaseName(child.tableName.database)(sparkSession)
-    val tableName = child.tableName.table
-    setAuditTable(dbName, tableName)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      throw new NoSuchTableException(dbName, tableName)
-    }
-    if (null == relation.carbonTable) {
-      LOGGER.error(s"Truncate table failed. table not found: $dbName.$child.tableName.table")
-      throw new NoSuchTableException(dbName, child.tableName.table)
+    Checker.validateTableExists(child.tableName.database, child.tableName.table, sparkSession)
+    val carbonTable = CarbonEnv.getCarbonTable(
+      child.tableName.database, child.tableName.table)(sparkSession)
+    setAuditTable(carbonTable)
+    if (!carbonTable.isTransactionalTable) {
+      LOGGER.error(s"Unsupported truncate non-transactional table")
+      throw new MalformedCarbonCommandException(
+        "Unsupported truncate non-transactional table")
     }
     if (child.partitionSpec.isDefined) {
       throw new MalformedCarbonCommandException(
         "Unsupported truncate table with specified partition")
     }
-    CarbonCleanFilesCommand(
-      databaseNameOp = Option(dbName),
-      tableName = Option(tableName),
-      None,
-      truncateTable = true
-    ).run(sparkSession)
+    SegmentStatusManager.truncateTable(carbonTable)
+    Seq.empty
   }
 
   override protected def opName = "TRUNCATE TABLE"
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 37f588c..574a651 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -354,7 +354,7 @@ object DeleteExecution {
   }
 
   // all or none : update status file, only if complete delete operation is successful.
-  def checkAndUpdateStatusFiles(
+  private def checkAndUpdateStatusFiles(
       executorErrors: ExecutionErrors,
       res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, Long))]],
       carbonTable: CarbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 4147f4b..0289254 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.index.DropIndexCommand
 import org.apache.spark.sql.execution.command.view.CarbonDropMVCommand
 import org.apache.spark.sql.hive.CarbonFileMetastore
+import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -165,13 +166,7 @@ case class CarbonDropTableCommand(
         && carbonTable != null
         && !(carbonTable.isMV && !dropChildTable)) {
       // delete the table folder
-      val tablePath = carbonTable.getTablePath
-      // delete table data only if it is not external table
-      if (FileFactory.isFileExist(tablePath) &&
-          !(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) {
-        val file = FileFactory.getCarbonFile(tablePath)
-        CarbonUtil.deleteFoldersAndFilesSilent(file)
-      }
+      CarbonInternalMetastore.deleteTableDirectory(carbonTable)
       // Delete lock directory if external lock path is specified.
       if (CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_PATH,
         CarbonCommonConstants.LOCK_PATH_DEFAULT).toLowerCase
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index dc385e6..d16170f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -42,8 +42,6 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 
@@ -519,7 +517,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       case databaseName ~ tableName ~ optionList =>
         CarbonCleanFilesCommand(
           CarbonParserUtil.convertDbNameToLowerCase(databaseName),
-          Option(tableName.toLowerCase()), optionList)
+          tableName.toLowerCase(),
+          optionList.map(_.toMap).getOrElse(Map.empty))
     }
 
   protected lazy val explainPlan: Parser[LogicalPlan] =
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index 869bd24..e12a35b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -21,7 +21,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.command.management.CarbonCleanFilesCommand
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 
@@ -30,11 +32,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener}
+import org.apache.carbondata.view.MVManagerInSpark
 
 class CleanFilesPostEventListener extends OperationEventListener with Logging {
 
@@ -48,22 +50,51 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean): Unit = {
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        indexTable)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+        indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress,
+        true)
+      cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+    }
+  }
+
+  private def cleanFilesForMv(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      options: Map[String, String]): Unit = {
+    val viewSchemas = MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
+    if (!viewSchemas.isEmpty) {
+      viewSchemas.asScala.map { schema =>
+        CarbonCleanFilesCommand(
+          Some(schema.getIdentifier.getDatabaseName),
+          schema.getIdentifier.getTableName,
+          options,
+          isInternalCleanCall = true)
+      }.foreach(_.run(sparkSession))
     }
   }
 
@@ -71,7 +102,7 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
    * This method added to clean the segments which are success in SI and may be compacted or marked
    * for delete in main table, which can happen in case of concurrent scenarios.
    */
-  def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable,
+  private def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable,
       mainTable: CarbonTable): Unit = {
     val mainTableStatusLock: ICarbonLock = CarbonLockFactory
       .getCarbonLockObj(mainTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
index ba892ab..085e806 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SIDropEventListener.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events.{DropTablePreEvent, Event, OperationContext, OperationEventListener}
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/hive/CarbonInternalMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/hive/CarbonInternalMetastore.scala
index 539c5f2..04c6ecf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/hive/CarbonInternalMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/hive/CarbonInternalMetastore.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, IndexTableInfo}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
 
 object CarbonInternalMetastore {
 
@@ -339,15 +338,11 @@ object CarbonInternalMetastore {
     keysParts
   }
 
-  def deleteTableDirectory(dbName: String, tableName: String,
-    sparkSession: SparkSession): Unit = {
-    val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val metadataFilePath =
-      CarbonTablePath.getMetadataPath(tablePath)
-    if (FileFactory.isFileExist(metadataFilePath)) {
-      val file = FileFactory.getCarbonFile(metadataFilePath)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+  def deleteTableDirectory(carbonTable: CarbonTable): Unit = {
+    if (FileFactory.isFileExist(carbonTable.getTablePath) &&
+      !(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) {
+      val file = FileFactory.getCarbonFile(carbonTable.getTablePath)
+      CarbonUtil.deleteFoldersAndFilesSilent(file)
     }
   }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index cdbc711..582ecfe 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -482,17 +482,6 @@ object SecondaryIndexCreator {
           }.${ secondaryIndexModel.secondaryIndex.indexName } SET
              |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin).collect()
       }
-      try {
-        if (!isCompactionCall) {
-          SegmentStatusManager
-            .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null, false, false)
-        }
-      } catch {
-        case e: Exception =>
-          LOGGER
-            .error("Problem while cleaning up stale folder for index table " +
-                   secondaryIndexModel.secondaryIndex.indexName, e)
-      }
       // close the executor service
       if (null != executorService) {
         executorService.shutdownNow()
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
index f07a30e..16898b2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.optimizer.CarbonFilters
 
-import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.trash.DataTrashManager
 
 /**
  * clean files api
@@ -29,33 +30,17 @@ import org.apache.carbondata.api.CarbonStore
 object CleanFiles {
 
   /**
-   * The method deletes all data if forceTableCLean <true> and lean garbage segment
-   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
-   *
-   * @param spark           : Database name
-   * @param dbName          : Table name
-   * @param tableName       : Table path
-   * @param forceTableClean : if true, it deletes the table and its contents with force.It does not
-   *                        drop table from hive metastore so should be very careful to use it.
+   * The method clean garbage data
    */
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
-     forceTableClean: Boolean = false, isForceDeletion: Boolean = false,
-     cleanStaleInProgress: Boolean = false ): Unit = {
+      isForceDeletion: Boolean = false, cleanStaleInProgress: Boolean = false ): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark)
-    val carbonTable = if (!forceTableClean) {
-      CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
-    } else {
-      null
-    }
-    CarbonStore.cleanFiles(
-      dbName = dbName,
-      tableName = tableName,
-      tablePath = tablePath,
-      carbonTable = carbonTable,
-      forceTableClean = forceTableClean,
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark)
+    DataTrashManager.cleanGarbageData(
+      carbonTable,
       isForceDeletion,
-      cleanStaleInProgress)
+      cleanStaleInProgress,
+      CarbonFilters.getPartitions(Seq.empty[Expression], spark, carbonTable))
   }
 
   def main(args: Array[String]): Unit = {
@@ -67,15 +52,13 @@ object CleanFiles {
 
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
-    var forceTableClean = false
     var isForceDeletion = false
     var cleanInprogress = false
-    if (args.length > 4) {
-      forceTableClean = args(2).toBoolean
-      isForceDeletion = args(3).toBoolean
-      cleanInprogress = args(4).toBoolean
+    if (args.length > 3) {
+      isForceDeletion = args(2).toBoolean
+      cleanInprogress = args(3).toBoolean
     }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    cleanFiles(spark, dbName, tableName, forceTableClean, isForceDeletion, cleanInprogress)
+    cleanFiles(spark, dbName, tableName, isForceDeletion, cleanInprogress)
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 5615142..5c288d7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -50,7 +50,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     assert(segmentNumber1 == 4)
     sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('stale_inprogress'='true')").show
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
-    assert(0 == segmentNumber2)
+    assert(4 == segmentNumber2)
     assert(!FileFactory.isFileExist(trashFolderPath))
     // no carbondata file is added to the trash
     assert(getFileCountInTrashFolder(trashFolderPath) == 0)
diff --git a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 4ee1542..9d982a1 100644
--- a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -120,7 +120,7 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
     DeleteSegmentById.main(Array(s"${location}", table, "0"))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
-    CleanFiles.main(Array(s"${location}", table, "false", "true", "true"))
+    CleanFiles.main(Array(s"${location}", table, "true", "true"))
     CarbonProperties.getInstance()
       .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
@@ -137,11 +137,15 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
     val table = "carbon_table4"
     dropTable(table)
     createAndLoadTestTable(table, "csv_table")
-    CleanFiles.main(Array(s"${location}", table, "true", "false", "false"))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    CleanFiles.main(Array(s"${location}", table, "true", "false"))
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
     val tablePath = carbonTable.getTablePath
     val f = new File(tablePath)
-    assert(!f.exists())
+    assert(f.exists())
 
     dropTable(table)
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index 5c41dd7..ff023ce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -19,29 +19,14 @@ package org.apache.carbondata.processing.loading;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.CarbonLockUtil;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
@@ -54,105 +39,6 @@ public class TableProcessingOperations {
       LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
 
   /**
-   * delete folder which metadata no exist in tablestatus
-   * this method don't check tablestatus history.
-   */
-  public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
-      final boolean isCompactionFlow) throws IOException {
-    String metaDataLocation = carbonTable.getMetadataPath();
-    String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
-    if (FileFactory.isFileExist(partitionPath)) {
-      // list all segments before reading tablestatus file.
-      CarbonFile[] allSegments = FileFactory.getCarbonFile(partitionPath).listFiles();
-      // there is no segment
-      if (allSegments == null || allSegments.length == 0) {
-        return;
-      }
-      int retryCount = CarbonLockUtil
-          .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
-      int maxTimeout = CarbonLockUtil
-          .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
-      ICarbonLock carbonTableStatusLock = CarbonLockFactory
-          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.TABLE_STATUS_LOCK);
-      try {
-        if (carbonTableStatusLock.lockWithRetries(retryCount, maxTimeout)) {
-          LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-          // there is no segment or failed to read tablestatus file.
-          // so it should stop immediately.
-          if (details == null || details.length == 0) {
-            return;
-          }
-          Set<String> metadataSet = new HashSet<>(details.length);
-          for (LoadMetadataDetails detail : details) {
-            metadataSet.add(detail.getLoadName());
-          }
-          List<CarbonFile> staleSegments = new ArrayList<>(allSegments.length);
-          Set<String> staleSegmentsId = new HashSet<>(allSegments.length);
-          for (CarbonFile segment : allSegments) {
-            String segmentName = segment.getName();
-            // check segment folder pattern
-            if (segmentName.startsWith(CarbonTablePath.SEGMENT_PREFIX)) {
-              String[] parts = segmentName.split(CarbonCommonConstants.UNDERSCORE);
-              if (parts.length == 2) {
-                boolean isOriginal = !parts[1].contains(".");
-                if (isCompactionFlow) {
-                  // in compaction flow,
-                  // it should be merged segment and segment metadata doesn't exists
-                  if (!isOriginal && !metadataSet.contains(parts[1])) {
-                    staleSegments.add(segment);
-                    staleSegmentsId.add(parts[1]);
-                  }
-                } else {
-                  // in loading flow,
-                  // it should be original segment and segment metadata doesn't exists
-                  if (isOriginal && !metadataSet.contains(parts[1])) {
-                    staleSegments.add(segment);
-                    staleSegmentsId.add(parts[1]);
-                  }
-                }
-              }
-            }
-          }
-          // delete segment folders one by one
-          for (CarbonFile staleSegment : staleSegments) {
-            try {
-              CarbonUtil.deleteFoldersAndFiles(staleSegment);
-            } catch (IOException | InterruptedException e) {
-              LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
-            }
-          }
-          if (staleSegments.size() > 0) {
-            // get the segment metadata path
-            String segmentFilesLocation =
-                CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
-            // delete the segment metadata files also
-            CarbonFile[] staleSegmentMetadataFiles = FileFactory.getCarbonFile(segmentFilesLocation)
-                .listFiles(file -> (staleSegmentsId
-                    .contains(file.getName().split(CarbonCommonConstants.UNDERSCORE)[0])));
-            for (CarbonFile staleSegmentMetadataFile : staleSegmentMetadataFiles) {
-              staleSegmentMetadataFile.delete();
-            }
-          }
-        } else {
-          String errorMessage =
-              "Not able to acquire the Table status lock for partial load deletion for table "
-                  + carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
-          if (isCompactionFlow) {
-            LOGGER.error(errorMessage + ", retry compaction");
-            throw new RuntimeException(errorMessage + ", retry compaction");
-          } else {
-            LOGGER.error(errorMessage);
-          }
-        }
-      } finally {
-        carbonTableStatusLock.unlock();
-      }
-    }
-  }
-
-  /**
    *
    * This method will delete the local data load folder location after data load is complete
    *
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index e6b6df7..e41b615 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -89,10 +88,21 @@ public final class CarbonLoaderUtil {
     }
   }
 
-  public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
+  /**
+   * delete segment after data loading failure
+   */
+  public static void deleteSegmentForFailure(CarbonLoadModel loadModel) {
+    int currentLoad = Integer.parseInt(loadModel.getSegmentId());
     String segmentPath = CarbonTablePath.getSegmentPath(
         loadModel.getTablePath(), currentLoad + "");
-    deleteStorePath(segmentPath);
+    try {
+      if (FileFactory.isFileExist(segmentPath)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+        CarbonUtil.deleteFoldersAndFiles(carbonFile);
+      }
+    } catch (IOException | InterruptedException e) {
+      LOGGER.error("Unable to delete segment: " + segmentPath + ", " + e.getMessage(), e);
+    }
   }
 
   /**
@@ -130,17 +140,6 @@ public final class CarbonLoaderUtil {
     return true;
   }
 
-  public static void deleteStorePath(String path) {
-    try {
-      if (FileFactory.isFileExist(path)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(path);
-        CarbonUtil.deleteFoldersAndFiles(carbonFile);
-      }
-    } catch (IOException | InterruptedException e) {
-      LOGGER.error("Unable to delete the given path :: " + e.getMessage(), e);
-    }
-  }
-
   /**
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.
@@ -252,8 +251,6 @@ public final class CarbonLoaderUtil {
     int maxTimeout = CarbonLockUtil
         .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
             CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
-    // TODO only for overwrite scene
-    final List<LoadMetadataDetails> staleLoadMetadataDetails = new ArrayList<>();
     try {
       if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
         LOGGER.info(
@@ -264,7 +261,6 @@ public final class CarbonLoaderUtil {
                 CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        List<CarbonFile> staleFolders = new ArrayList<>();
         Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
         // create a new segment Id if load has just begun else add the already generated Id
         if (loadStartEntry) {
@@ -323,10 +319,6 @@ public final class CarbonLoaderUtil {
             for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
               if (entry.getSegmentStatus() != SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
-                // For insert overwrite, we will delete the old segment folder immediately
-                // So collect the old segments here
-                addToStaleFolders(identifier, staleFolders, entry);
-                staleLoadMetadataDetails.add(entry);
               }
             }
           }
@@ -337,11 +329,6 @@ public final class CarbonLoaderUtil {
           }
           listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
         }
-        // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
-        // so empty segment folder should be deleted
-        if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
-          addToStaleFolders(identifier, staleFolders, newMetaEntry);
-        }
 
         for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
           // if the segments is in the list of marked for delete then update the status.
@@ -354,47 +341,8 @@ public final class CarbonLoaderUtil {
                     + CarbonTablePath.SEGMENT_EXT);
           }
         }
-
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
             .toArray(new LoadMetadataDetails[0]));
-
-        // Delete all old stale segment folders
-        for (CarbonFile staleFolder : staleFolders) {
-          // try block is inside for loop because even if there is failure in deletion of 1 stale
-          // folder still remaining stale folders should be deleted
-          try {
-            CarbonUtil.deleteFoldersAndFiles(staleFolder);
-          } catch (IOException | InterruptedException e) {
-            LOGGER.error("Failed to delete stale folder: " + e.getMessage(), e);
-          }
-        }
-        if (!staleLoadMetadataDetails.isEmpty()) {
-          final String segmentFileLocation =
-              CarbonTablePath.getSegmentFilesLocation(identifier.getTablePath())
-                  + CarbonCommonConstants.FILE_SEPARATOR;
-          final String segmentLockFileLocation =
-              CarbonTablePath.getLockFilesDirPath(identifier.getTablePath())
-                  + CarbonCommonConstants.FILE_SEPARATOR;
-          for (LoadMetadataDetails staleLoadMetadataDetail : staleLoadMetadataDetails) {
-            try {
-              CarbonUtil.deleteFoldersAndFiles(
-                  FileFactory.getCarbonFile(segmentFileLocation
-                      + staleLoadMetadataDetail.getSegmentFile())
-              );
-            } catch (IOException | InterruptedException e) {
-              LOGGER.error("Failed to delete segment file: " + e.getMessage(), e);
-            }
-            try {
-              CarbonUtil.deleteFoldersAndFiles(
-                  FileFactory.getCarbonFile(segmentLockFileLocation
-                      + CarbonTablePath.addSegmentPrefix(staleLoadMetadataDetail.getLoadName())
-                      + LockUsage.LOCK)
-              );
-            } catch (IOException | InterruptedException e) {
-              LOGGER.error("Failed to delete segment lock file: " + e.getMessage(), e);
-            }
-          }
-        }
         status = true;
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
@@ -414,17 +362,6 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
-      List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
-    String path = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), entry.getLoadName());
-    // add to the deletion list only if file exist else HDFS file system will throw
-    // exception while deleting the file if file path does not exist
-    if (FileFactory.isFileExist(path)) {
-      staleFolders.add(FileFactory.getCarbonFile(path));
-    }
-  }
-
   /**
    * Method to create new entry for load in table status file
    *


Mime
View raw message