carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4067]: Removing force option in clean files command and changing behaviour when MFD, Compacted and stale Inprogress segments can be deleted
Date Sat, 05 Dec 2020 10:25:45 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e58bbb  [CARBONDATA-4067]: Removing force option in clean files command and changing behaviour when MFD, Compacted and stale Inprogress segments can be deleted
7e58bbb is described below

commit 7e58bbb5b28f497b6597dc58573cff902ee519b9
Author: Vikram Ahuja <vikramahuja8803@gmail.com>
AuthorDate: Fri Nov 6 12:59:43 2020 +0530

    [CARBONDATA-4067]: Removing force option in clean files command and changing behaviour when MFD, Compacted and stale Inprogress segments can be deleted
    
    Why is this PR needed?
    Change the behaviour change for clean files operation
    Old behaviour: Clean files command is by default force option = true and ignores query timeout. So, the MFD, Compacted and Inprogress segments are removed immediately when clean files is called. The user can by mistake call delete some important data
    
    What changes were proposed in this PR?
    Instead of just a force option deleting all MFD, Compacted and Insert In Progress segments, dividing them into 2 parameters, forceClean and cleanStaleInProgress. forceClean parameter will clean data immediately and cleanStaleInProgress will decide if stale InProgress segments can be deleted. The behaviour is described below.
    
    New Behaviour: clean files is no longer force by default and depends on 2 variables(forceClean and cleanStaleInProgress).
    default clean files behaviour(clean files on table t1): clean MFD and Compacted segments will depend on query timeout(1 hr) and trashRetentionTimeout(7 days, default). For example:
    If trashRetentionTimeout is 7 days and query timeout is 1 hr--> Delete after 7 days
    If trashRetentionTimeout is 0 days and query timeout is 1 hr--> Delete after 1 hr
    It will also empty trash based on trash retention time.
    clean files on table t1 options('force'='true'): clean MFD and Compacted segments immediately(Do not check for any timeout) and empty trash immediately.
    clean files on table t1 options('clean_inprgress'='true') : clean stale inprogress, MFD and Compacted segments depends on trashRetentionTimeout, after 7 days(default behaviour) and empty trash based on trash retention time.
    clean files on table t1 options('clean_inprgress'='true', 'force'='true') : clean MFD, Compacted and stale inprogress segments immediately(Do not check for any timeout) and empty trash immediately.
    
    Does this PR introduce any user interface change?
    Yes. Document is updated.
    
    Is any new testcase added?
    No (previous test cases changed)
    
    This closes #4035
---
 .../core/statusmanager/SegmentStatusManager.java   |  58 +++---------
 .../carbondata/core/util/CleanFilesUtil.java       |   3 +
 .../carbondata/core/util/DeleteLoadFolders.java    |  97 ++++++++++++-------
 docs/clean-files.md                                |  21 ++++-
 .../TestIndexModelWithAggQueries.scala             |   8 +-
 .../secondaryindex/TestIndexModelWithIUD.scala     |   8 +-
 .../testsuite/secondaryindex/TestIndexRepair.scala |  30 +++---
 .../secondaryindex/TestSIWithSecondaryIndex.scala  |   6 ++
 .../org/apache/carbondata/api/CarbonStore.scala    |  19 +++-
 .../carbondata/events/CleanFilesEvents.scala       |   3 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |   3 +-
 .../org/apache/carbondata/view/MVRefresher.scala   |   2 +-
 .../management/CarbonCleanFilesCommand.scala       |  24 +++--
 .../management/CarbonInsertIntoWithDf.scala        |   2 +-
 .../command/management/CarbonLoadDataCommand.scala |   2 +-
 .../events/CleanFilesPostEventListener.scala       |   9 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |   2 +-
 .../org/apache/spark/sql/test/util/QueryTest.scala |  10 ++
 .../scala/org/apache/spark/util/CleanFiles.scala   |  15 ++-
 .../bloom/BloomCoarseGrainIndexFunctionSuite.scala |   6 +-
 .../testsuite/binary/TestBinaryDataType.scala      |   6 +-
 .../dataload/TestLoadDataWithCompression.scala     |   6 +-
 .../allqueries/InsertIntoCarbonTableTestCase.scala |   6 +-
 .../cleanfiles/TestCleanFileCommand.scala          | 103 ++++++++++++---------
 .../TestCleanFilesCommandPartitionTable.scala      |  80 +++++++---------
 .../CompactionSupportGlobalSortFunctionTest.scala  |  16 ++--
 .../TableLevelCompactionOptionTest.scala           |  12 ++-
 .../FlatFolderTableLoadingTestCase.scala           |  13 ++-
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  |   8 +-
 .../testsuite/segment/ShowSegmentTestCase.scala    |  22 ++++-
 .../TestSegmentReadingForMultiThreading.scala      |   6 ++
 .../StandardPartitionGlobalSortTestCase.scala      |  12 ++-
 .../StandardPartitionTableCleanTestCase.scala      |   7 +-
 .../StandardPartitionTableCompactionTestCase.scala |   7 +-
 .../view/rewrite/TestPartitionWithMV.scala         |   7 +-
 .../org/apache/spark/util/CarbonCommandSuite.scala |   9 +-
 36 files changed, 398 insertions(+), 250 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6fc0754..c062a02 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -990,46 +990,6 @@ public class SegmentStatusManager {
     return newListMetadata;
   }
 
-  private static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
-      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
-
-    try {
-
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-          Charset.forName(DEFAULT_CHARSET)));
-
-      // make the table status file smaller by removing fields that are default value
-      listOfLoadFolderDetails.forEach(LoadMetadataDetails::removeUnnecessaryField);
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } catch (IOException ie) {
-      LOG.error("Error message: " + ie.getLocalizedMessage());
-      writeOperation.setFailed();
-      throw ie;
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOG.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-  }
-
   private static class ReturnTuple {
     LoadMetadataDetails[] details;
     boolean isUpdateRequired;
@@ -1040,16 +1000,18 @@ public class SegmentStatusManager {
   }
 
   private static ReturnTuple isUpdateRequired(boolean isForceDeletion, CarbonTable carbonTable,
-      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details,
+      boolean cleanStaleInProgress) {
     // Delete marked loads
     boolean isUpdateRequired = DeleteLoadFolders
         .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details,
-            carbonTable.getMetadataPath());
+            carbonTable.getMetadataPath(), cleanStaleInProgress);
     return new ReturnTuple(details, isUpdateRequired);
   }
 
   public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion,
-      List<PartitionSpec> partitionSpecs) throws IOException {
+      List<PartitionSpec> partitionSpecs, boolean cleanStaleInprogress,
+      boolean isCleanFilesOperation) throws IOException {
     LoadMetadataDetails[] metadataDetails =
         SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
     // delete the expired segment lock files
@@ -1059,7 +1021,8 @@ public class SegmentStatusManager {
       boolean updateCompletionStatus = false;
       LoadMetadataDetails[] newAddedLoadHistoryList = null;
       ReturnTuple tuple =
-          isUpdateRequired(isForceDeletion, carbonTable, identifier, metadataDetails);
+          isUpdateRequired(isForceDeletion, carbonTable, identifier, metadataDetails,
+              cleanStaleInprogress);
       if (tuple.isUpdateRequired) {
         ICarbonLock carbonTableStatusLock =
             CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
@@ -1079,7 +1042,8 @@ public class SegmentStatusManager {
             LoadMetadataDetails[] details =
                 SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
             ReturnTuple tuple2 =
-                isUpdateRequired(isForceDeletion, carbonTable, identifier, details);
+                isUpdateRequired(isForceDeletion, carbonTable,
+                    identifier, details, cleanStaleInprogress);
             if (!tuple2.isUpdateRequired) {
               return;
             }
@@ -1095,7 +1059,7 @@ public class SegmentStatusManager {
             // if execute command 'clean files' or the number of invisible segment info
             // exceeds the value of 'carbon.invisible.segments.preserve.count',
             // it need to append the invisible segment list to 'tablestatus.history' file.
-            if (isForceDeletion || (invisibleSegmentCnt > invisibleSegmentPreserveCnt)) {
+            if (isCleanFilesOperation || invisibleSegmentCnt > invisibleSegmentPreserveCnt) {
               TableStatusReturnTuple tableStatusReturn = separateVisibleAndInvisibleSegments(
                   tuple2.details, latestMetadata, invisibleSegmentCnt, maxSegmentId);
               LoadMetadataDetails[] oldLoadHistoryList = readLoadHistoryMetadata(
@@ -1136,7 +1100,7 @@ public class SegmentStatusManager {
           if (updateCompletionStatus) {
             DeleteLoadFolders
                 .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
-                    isForceDeletion, partitionSpecs);
+                  isForceDeletion, partitionSpecs, cleanStaleInprogress);
           }
         }
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 6311d3a..bdfab6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -158,6 +158,9 @@ public class CleanFilesUtil {
     }
     LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(carbonTable
         .getMetadataPath());
+    if (details == null || details.length == 0) {
+      return;
+    }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
     List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index d509959..e1b4663 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -69,20 +69,23 @@ public final class DeleteLoadFolders {
   public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
       LoadMetadataDetails[] newAddedLoadHistoryList,
       boolean isForceDelete,
-      List<PartitionSpec> specs) {
+      List<PartitionSpec> specs,
+      boolean cleanStaleInProgress) {
     LoadMetadataDetails[] currentDetails =
         SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
     physicalFactAndMeasureMetadataDeletion(carbonTable,
         currentDetails,
         isForceDelete,
         specs,
-        currentDetails);
+        currentDetails,
+        cleanStaleInProgress);
     if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
       physicalFactAndMeasureMetadataDeletion(carbonTable,
           newAddedLoadHistoryList,
           isForceDelete,
           specs,
-          currentDetails);
+          currentDetails,
+          cleanStaleInProgress);
     }
   }
 
@@ -90,13 +93,13 @@ public final class DeleteLoadFolders {
    * Delete the invalid data physically from table.
    * @param carbonTable table
    * @param loadDetails Load details which need clean up
-   * @param isForceDelete is Force delete requested by user
+   * @param isForceDelete Force delete Compacted and MFD segments. it will empty the trash folder
    * @param specs Partition specs
    * @param currLoadDetails Current table status load details which are required for update manager.
    */
   private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
       LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
-      LoadMetadataDetails[] currLoadDetails) {
+      LoadMetadataDetails[] currLoadDetails, boolean cleanStaleInProgress) {
     List<TableIndex> indexes = new ArrayList<>();
     try {
       for (TableIndex index : IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)) {
@@ -113,7 +116,7 @@ public final class DeleteLoadFolders {
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete, cleanStaleInProgress)) {
         try {
           if (oneLoad.getSegmentFile() != null) {
             SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
@@ -173,40 +176,66 @@ public final class DeleteLoadFolders {
   }
 
   private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
-        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
-      return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) && CarbonUpdateUtil
-          .isMaxQueryTimeoutExceeded(deletionTime);
+      boolean isForceDelete, boolean cleanStaleInProgress) {
+    if (oneLoad.getVisibility().equalsIgnoreCase("true")) {
+      return canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress);
     }
-
     return false;
   }
 
   private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
+      boolean isForceDelete, boolean cleanStaleInProgress) {
     // Check if the segment is added externally and path is set then do not delete it
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus()
-        || SegmentStatus.COMPACTED == oneLoad.getSegmentStatus()) && (oneLoad.getPath() == null
-        || oneLoad.getPath().equalsIgnoreCase("NA"))) {
-      if (isForceDelete) {
+    if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
+      return canDeleteThisLoad(oneLoad,  isForceDelete, cleanStaleInProgress);
+    }
+    return false;
+  }
+
+  private static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
+      boolean isForceDelete, boolean cleanStaleInProgress) {
+    /*
+     * if cleanStaleInProgress == false and  isForceDelete == false, clean MFD and Compacted
+     *  segments will depend on query timeout(1 hr) and trashRetentionTimeout(7 days, default).
+     *  For example:
+     *  If trashRetentionTimeout is 7 days and query timeout is 1 hr--> Delete after 7 days
+     *  If trashRetentionTimeout is 0 days and query timeout is 1 hr--> Delete after 1 hr
+     *
+     * if cleanStaleInProgress == false and  isForceDelete == true, clean MFD and Compacted
+     *  segments immediately(Do not check for any timeout)
+     *
+     * if cleanStaleInProgress == true and  isForceDelete == false, clean Stale Inprogress, MFD and
+     *  compacted segments after 7 days(taking carbon.trash.retention.time value)
+     *
+     * if cleanStaleInProgress == true and  isForceDelete == true, clean MFD, Compacted and
+     *  stale inprogress segments immediately.(Do not check for any timeout)
+     */
+    if (isForceDelete) {
+      // immediately delete compacted and MFD
+      if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
+          .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
         return true;
       }
-      long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
-
+      // immediately delete inprogress segments if cleanstaleinprogress is true
+      return cleanStaleInProgress && (SegmentStatus.INSERT_IN_PROGRESS == oneLoad
+          .getSegmentStatus() || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad
+          .getSegmentStatus());
+    }
+    long deletionTime = oneLoad.getModificationOrDeletionTimestamp();
+    // in case there is no deletion or modification timestamp, take the load start time as
+    // deleteTime
+    if (deletionTime == 0) {
+      deletionTime = oneLoad.getLoadStartTime();
+    }
+    if (SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() || SegmentStatus
+        .MARKED_FOR_DELETE == oneLoad.getSegmentStatus()) {
+      // delete MFD, compacted segments after checking trash timeout and query timeout
       return TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime) && CarbonUpdateUtil
-          .isMaxQueryTimeoutExceeded(deletionTime);
-
+        .isMaxQueryTimeoutExceeded(deletionTime);
     }
-
-    return false;
+    return (SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() || SegmentStatus
+        .INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus()) && cleanStaleInProgress &&
+        TrashUtil.isTrashRetentionTimeoutExceeded(deletionTime);
   }
 
   private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
@@ -221,12 +250,12 @@ public final class DeleteLoadFolders {
   }
 
   public static boolean deleteLoadFoldersFromFileSystem(
-      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
-      LoadMetadataDetails[] details, String metadataPath) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete, LoadMetadataDetails[]
+      details, String metadataPath, boolean cleanStaleInProgress) {
     boolean isDeleted = false;
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete, cleanStaleInProgress)) {
           ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
               CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
           try {
@@ -237,7 +266,7 @@ public final class DeleteLoadFolders {
                 LoadMetadataDetails currentDetails =
                     getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
                 if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
-                    isForceDelete)) {
+                    isForceDelete, cleanStaleInProgress)) {
                   oneLoad.setVisibility("false");
                   isDeleted = true;
                   LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
diff --git a/docs/clean-files.md b/docs/clean-files.md
index 14ffb60..db4885b 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -24,6 +24,7 @@ Clean files command is used to remove the Compacted, Marked For Delete ,In Progr
    ```
    CLEAN FILES FOR TABLE TABLE_NAME
    ```
+The above clean files command will clean Marked For Delete and Compacted segments depending on ```max.query.execution.time``` (default 1 hr) and ``` carbon.trash.retention.days``` (default 7 days). It will also delete the timestamp subdirectories from the trash folder after expiration day(default 7 day, can be configured)
 
 
 ### TRASH FOLDER
@@ -37,10 +38,24 @@ Clean files command is used to remove the Compacted, Marked For Delete ,In Progr
    ``` 
   Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command.
 
-### FORCE DELETE TRASH
-The force option with clean files command deletes all the files and folders from the trash folder.
+### FORCE OPTION
+The force option with clean files command deletes all the files and folders from the trash folder and delete the Marked for Delete and Compacted segments immediately. Since Clean Files operation with force option will delete data that can never be recovered, the force option by default is disabled. Clean files with force option is only allowed when the carbon property ```carbon.clean.file.force.allowed``` is set to true. The default value of this property is false.
+                                                                                                                                                                       
+
 
   ```
   CLEAN FILES FOR TABLE TABLE_NAME options('force'='true')
   ```
-Since Clean Files operation with force option will delete data that can never be recovered, the force option by default is disabled. Clean files with force option is only allowed when the carbon property ```carbon.clean.file.force.allowed``` is set to true. The default value of this property is false.
\ No newline at end of file
+
+### STALE_INPROGRESS OPTION
+The stale_inprogress option deletes the stale Insert In Progress segments after the expiration of the property    ```carbon.trash.retention.days``` 
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true')
+  ```
+
+The stale_inprogress option with force option will delete Marked for delete, Compacted and stale Insert In progress immediately. It will also empty  the trash folder immediately.
+
+  ```
+  CLEAN FILES FOR TABLE TABLE_NAME options('stale_inprogress'='true', 'force'='true')
+  ```
\ No newline at end of file
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
index 5255ebd..45a2af3 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithAggQueries.scala
@@ -21,7 +21,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * test cases with secondary index and agg queries
@@ -161,7 +163,11 @@ class TestIndexModelWithAggQueries extends QueryTest with BeforeAndAfterAll {
       case Some(row) => assert(row.get(1).toString.contains("Marked for Delete"))
       case None => assert(false)
     }
-    sql("clean files for table clean")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("clean files for table clean options('force'='true')")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val mainTable = CarbonEnv.getCarbonTable(Some("default"), "clean")(sqlContext.sparkSession)
     val indexTable = CarbonEnv.getCarbonTable(Some("default"), "clean_index")(
       sqlContext.sparkSession)
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
index 6f1b709..a18036d 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexModelWithIUD.scala
@@ -23,7 +23,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
 
 /**
@@ -59,7 +61,11 @@ class TestIndexModelWithIUD extends QueryTest with BeforeAndAfterAll {
              .equals(SegmentStatus.MARKED_FOR_DELETE.getMessage))
 
     // execute clean files
-    sql("clean files for table dest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("clean files for table dest options('force'='true')")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
 
     sql("show segments for table index_dest2").collect()
     val exception_index_dest1 = intercept[IndexOutOfBoundsException] {
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
index 5d6f816..6a0d268 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 /**
@@ -30,6 +32,8 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists indextable1 on maintable")
     sql("drop index if exists indextable2 on maintable")
     sql("drop table if exists maintable")
+        CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
   }
 
   test("reindex command after deleting segments from SI table") {
@@ -40,7 +44,7 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
     sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     sql(s"""ALTER TABLE default.indextable1 SET
            |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
     val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
@@ -68,7 +72,7 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
 
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
     sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
-    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1 options('force'='true')")
     sql(s"""ALTER TABLE test.indextable1 SET
            |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
     val df1 = sql("select * from test.maintable where c = 'string2'").queryExecution.sparkPlan
@@ -95,7 +99,7 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
 
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     assert(preDeleteSegments!=postDeleteSegments)
     sql(s"""ALTER TABLE default.indextable1 SET
@@ -126,7 +130,7 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
 
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(1,2,3)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     assert(preDeleteSegments!=postDeleteSegments)
     sql(s"""ALTER TABLE default.indextable1 SET
@@ -150,9 +154,9 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
     sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
     sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
     val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
     assert(preDeleteSegments!=postDeleteSegmentsIndexOne)
@@ -174,9 +178,9 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
     sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
     val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
     sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1 options('force'='true')")
     sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
-    sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE2 options('force'='true')")
     val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
     val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
     assert(preDeleteSegments != postDeleteSegmentsIndexOne)
@@ -204,9 +208,9 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
 
     val preDeleteSegmentsTableOne = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE1").count()
     sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
-    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1 options('force'='true')")
     sql("DELETE FROM TABLE test.INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
-    sql("CLEAN FILES FOR TABLE test.INDEXTABLE2")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE2 options('force'='true')")
     val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
     val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE2").count()
 
@@ -219,9 +223,9 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
 
     val preDeleteSegmentsTableTwo = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE2").count()
     sql("DELETE FROM TABLE test.INDEXTABLE3 WHERE SEGMENT.ID IN(1)")
-    sql("CLEAN FILES FOR TABLE test.INDEXTABLE3")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE3 options('force'='true')")
     sql("DELETE FROM TABLE test.INDEXTABLE4 WHERE SEGMENT.ID IN(0,1)")
-    sql("CLEAN FILES FOR TABLE test.INDEXTABLE4")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE4 options('force'='true')")
     val postDeleteSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE3").count()
     val postDeleteSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE4").count()
 
@@ -252,6 +256,8 @@ class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists indextable1 on maintable")
     sql("drop index if exists indextable2 on maintable")
     sql("drop table if exists maintable")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
   }
 
 }
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index b8f0e76..286dea5 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -283,6 +283,12 @@ class TestSIWithSecondaryIndex extends QueryTest with BeforeAndAfterAll {
       indexTable.getMetadataPath + CarbonCommonConstants.FILE_SEPARATOR +
       CarbonTablePath.TABLE_STATUS_FILE,
       loadMetadataDetailsList)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE ud_index1  OPTIONS('stale_inprogress'='true','force'='true')")
+        .show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
 
     sql(s"""ALTER TABLE default.ud_index1 SET
            |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 480e357..160cdd1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.api
 
-import java.io.{DataInputStream, File, FileNotFoundException, InputStreamReader}
+import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
 import java.time.{Duration, Instant}
 import java.util
 import java.util.{Collections, Comparator}
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager, StageInput}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -304,9 +305,19 @@ object CarbonStore {
       tablePath: String,
       carbonTable: CarbonTable,
       forceTableClean: Boolean,
+      isForceDelete: Boolean,
+      cleanStaleInprogress: Boolean,
       currentTablePartitions: Option[Seq[PartitionSpec]] = None,
       truncateTable: Boolean = false): Unit = {
-  var carbonCleanFilesLock: ICarbonLock = null
+    // CleanFiles API is also exposed to the user, if they call this API with isForceDelete = true,
+    // need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+          " recovered. It is disabled by default, to enable clean files with force option," +
+          " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
     val absoluteTableIdentifier = if (forceTableClean) {
       AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
     } else {
@@ -328,8 +339,8 @@ object CarbonStore {
         if (truncateTable) {
           SegmentStatusManager.truncateTable(carbonTable)
         }
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+          isForceDelete, currentTablePartitions.map(_.asJava).orNull, cleanStaleInprogress, true)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
index b7e9c20..509e166 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
@@ -34,5 +34,6 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSessi
  * @param carbonTable
  * @param sparkSession
  */
-case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
+case class CleanFilesPostEvent(carbonTable: CarbonTable, cleanStaleInProgress: Boolean,
+    ifForceDelete: Boolean, sparkSession: SparkSession)
   extends Event with CleanFilesEventInfo
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index af23001..b355c07 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -595,7 +595,8 @@ object CommonUtil {
                 try {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(tableUniqueName)
-                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, false, null,
+                    true, true)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index f202a17..70b766b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -66,7 +66,7 @@ object MVRefresher {
     val viewIdentifier = viewSchema.getIdentifier
     val viewTableIdentifier = viewTable.getAbsoluteTableIdentifier
     // Clean up the old invalid segment data before creating a new entry for new load.
-    SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null)
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(viewTable, false, null, false, false)
     val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(viewTableIdentifier)
     // Acquire table status lock to handle concurrent data loading
     val lock: ICarbonLock = segmentStatusManager.getTableStatusLock
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index fe8f98b..c0ca47a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -58,8 +58,12 @@ case class CarbonCleanFilesCommand(
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
   val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
-  // forceClean will empty trash
+  // forceClean will clean the MFD and Compacted segments immediately and also empty the trash
+  // folder
   val forceClean = optionsMap.getOrElse("force", "false").toBoolean
+  // stale_inprogress will clean the In Progress segments based on retention time and it will
+  // clean immediately when force is true
+  val staleInprogress = optionsMap.getOrElse("stale_inprogress", "false").toBoolean
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
@@ -117,9 +121,9 @@ case class CarbonCleanFilesCommand(
         CleanFilesUtil.cleanStaleSegments(carbonTable)
       }
       if (forceTableClean) {
-        deleteAllData(sparkSession, databaseNameOp, tableName.get)
+        deleteAllData(sparkSession, databaseNameOp, tableName.get, forceClean, staleInprogress)
       } else {
-        cleanGarbageData(sparkSession, databaseNameOp, tableName.get)
+        cleanGarbageData(sparkSession, databaseNameOp, tableName.get, forceClean, staleInprogress)
       }
     } else {
       cleanGarbageDataInAllTables(sparkSession)
@@ -128,13 +132,14 @@ case class CarbonCleanFilesCommand(
       cleanFileCommands.foreach(_.processData(sparkSession))
     }
     val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable, sparkSession)
+      CleanFilesPostEvent(carbonTable, staleInprogress, forceClean, sparkSession)
     OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
     Seq.empty
   }
 
   private def deleteAllData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String): Unit = {
+      databaseNameOp: Option[String], tableName: String, isForceDelete: Boolean,
+      cleanStaleInprogress: Boolean): Unit = {
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
@@ -143,11 +148,14 @@ case class CarbonCleanFilesCommand(
       tableName = tableName,
       tablePath = tablePath,
       carbonTable = null, // in case of delete all data carbonTable is not required.
-      forceTableClean = forceTableClean)
+      forceTableClean = forceTableClean,
+      isForceDelete = isForceDelete,
+      cleanStaleInprogress = cleanStaleInprogress)
   }
 
   private def cleanGarbageData(sparkSession: SparkSession,
-      databaseNameOp: Option[String], tableName: String): Unit = {
+      databaseNameOp: Option[String], tableName: String, isForceDelete: Boolean,
+      cleanStaleInprogress: Boolean): Unit = {
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -161,6 +169,8 @@ case class CarbonCleanFilesCommand(
       tablePath = carbonTable.getTablePath,
       carbonTable = carbonTable,
       forceTableClean = forceTableClean,
+      isForceDelete = isForceDelete,
+      cleanStaleInprogress = cleanStaleInprogress,
       currentTablePartitions = partitions,
       truncateTable = truncateTable)
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
index 75d319e..b114191 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
@@ -108,7 +108,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
           operationContext = operationContext)
 
       // Clean up the old invalid segment data before creating a new entry for new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions, false, false)
       // add the start entry for the new load in the table status file
       if ((updateModel.isEmpty || updateModel.isDefined)
           && !table.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index d5c3c84..65641b3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -125,7 +125,7 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
           updateModel = None,
           operationContext = operationContext)
       // Clean up the old invalid segment data before creating a new entry for new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions, false, false)
       // add the start entry for the new load in the table status file
       if (!table.isHivePartitionTable) {
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index 97f7836..869bd24 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -24,16 +24,14 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener}
@@ -53,13 +51,16 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
         val carbonTable = cleanFilesPostEvent.carbonTable
         val indexTables = CarbonIndexUtil
           .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
+        val isForceDelete = cleanFilesPostEvent.ifForceDelete
+        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
         indexTables.foreach { indexTable =>
           val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
             Seq.empty[Expression],
             cleanFilesPostEvent.sparkSession,
             indexTable)
           SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-            indexTable, true, partitions.map(_.asJava).orNull)
+              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
+            true)
           CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
           cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
         }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 82c92d2..cdbc711 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -485,7 +485,7 @@ object SecondaryIndexCreator {
       try {
         if (!isCompactionCall) {
           SegmentStatusManager
-            .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null)
+            .deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null, false, false)
         }
       } catch {
         case e: Exception =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index f3426c1..adcfae1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -29,7 +29,10 @@ import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 
 
@@ -170,6 +173,13 @@ class QueryTest extends PlanTest {
     LOGGER.error(sessionParams.asScala.map(x => x._1 + "=" + x._2).mkString(", "))
   }
 
+  def removeSegmentEntryFromTableStatusFile(carbonTable: CarbonTable, segmentNo: String) : Unit = {
+    val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      .filter(as => as.getLoadName != segmentNo)
+    SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+      carbonTable.getTablePath), details)
+  }
+
   def printTable(table: String, database: String = "default"): Unit = {
     sql("SELECT current_database()").show(100, false)
     sql(s"describe formatted ${ database }.${ table }").show(100, false)
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
index a9cb652..f07a30e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -39,7 +39,8 @@ object CleanFiles {
    *                        drop table from hive metastore so should be very careful to use it.
    */
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
-     forceTableClean: Boolean = false): Unit = {
+     forceTableClean: Boolean = false, isForceDeletion: Boolean = false,
+     cleanStaleInProgress: Boolean = false ): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
     val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark)
     val carbonTable = if (!forceTableClean) {
@@ -52,7 +53,9 @@ object CleanFiles {
       tableName = tableName,
       tablePath = tablePath,
       carbonTable = carbonTable,
-      forceTableClean = forceTableClean)
+      forceTableClean = forceTableClean,
+      isForceDeletion,
+      cleanStaleInProgress)
   }
 
   def main(args: Array[String]): Unit = {
@@ -65,10 +68,14 @@ object CleanFiles {
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     var forceTableClean = false
-    if (args.length > 2) {
+    var isForceDeletion = false
+    var cleanInprogress = false
+    if (args.length > 4) {
       forceTableClean = args(2).toBoolean
+      isForceDeletion = args(3).toBoolean
+      cleanInprogress = args(4).toBoolean
     }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    cleanFiles(spark, dbName, tableName, forceTableClean)
+    cleanFiles(spark, dbName, tableName, forceTableClean, isForceDeletion, cleanInprogress)
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
index d9d8e84..8f8d2be 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
@@ -855,7 +855,11 @@ class BloomCoarseGrainIndexFunctionSuite
     }
     // delete and clean the first segment, the corresponding index files should be cleaned too
     sql(s"DELETE FROM TABLE $bloomSampleTable WHERE SEGMENT.ID IN (0)")
-    sql(s"CLEAN FILES FOR TABLE $bloomSampleTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE $bloomSampleTable options('force'='true')")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     var indexPath = CarbonTablePath.getIndexesStorePath(carbonTable.getTablePath, "0", indexName)
     assert(!FileUtils.getFile(indexPath).exists(),
       "index file of this segment has been deleted, should not exist")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 2f37d2d..5d798d9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -1001,7 +1001,11 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
         assert(SegmentSequenceIds.length == 8)
 
         // clean files
-        segments = sql("CLEAN FILES FOR TABLE  carbontable")
+        CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+        segments = sql("CLEAN FILES FOR TABLE  carbontable options('force'='true')")
+        CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
         segments = sql("SHOW SEGMENTS FOR TABLE carbontable")
         SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
         assert(SegmentSequenceIds.contains("0.2"))
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
index 68e263a..5a90482 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithCompression.scala
@@ -380,7 +380,11 @@ class TestLoadDataWithCompression extends QueryTest with BeforeAndAfterEach with
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
     assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 8)
     sql(s"ALTER TABLE $tableName COMPACT 'major'")
-    sql(s"CLEAN FILES FOR TABLE $tableName")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE $tableName options('force'='true')")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     // after compaction and clean, there should be on segment
     checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(4 * 8)))
     assert(sql(s"SHOW SEGMENTS FOR TABLE $tableName").count() == 1)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 1d385cb..0d167fe 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -452,7 +452,11 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert into show_insert select 'abc',1")
     sql("insert overwrite table show_insert select * from show_insert")
     assert(sql("show segments for table show_insert").collect().length == 4)
-    sql("clean files for table show_insert")
+     CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("clean files for table show_insert options('force'='true')")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     assert(sql("show segments for table show_insert").collect().length == 1)
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 3618274..5615142 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -34,6 +34,29 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
 
   var count = 0
 
+  test("clean up table and test trash folder with IN PROGRESS segments with trash time = 0") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, "0")
+    // do not send the segment folders to trash
+    createTable()
+    loadData()
+    val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
+    editTableStatusFile(path)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
+    assert(segmentNumber1 == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('stale_inprogress'='true')").show
+    val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
+    assert(0 == segmentNumber2)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // no carbondata file is added to the trash
+    assert(getFileCountInTrashFolder(trashFolderPath) == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
   test("clean up table and test trash folder with IN PROGRESS segments") {
     // do not send the segment folders to trash
     createTable()
@@ -46,13 +69,19 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
 
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
     assert(segmentNumber1 == 4)
-    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('stale_inprogress'='true','force'='true')").show
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
     assert(0 == segmentNumber2)
     assert(!FileFactory.isFileExist(trashFolderPath))
     // no carbondata file is added to the trash
     assert(getFileCountInTrashFolder(trashFolderPath) == 0)
     sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
   }
 
   test("clean up table and test trash folder with Marked For Delete and Compacted segments") {
@@ -67,7 +96,12 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"""Delete from table cleantest where segment.id in(4)""")
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
-    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    sql(s"""show segments for table cleantest""").show()
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
     assert(segmentNumber1 == segmentNumber2 + 5)
     assert(!FileFactory.isFileExist(trashFolderPath))
@@ -81,7 +115,6 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     createTable()
     loadData()
     sql(s"""alter table cleantest compact 'minor'""")
-    sql(s"CLEAN FILES FOR TABLE cleantest").show
     sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2, "name"""")
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(5)))
@@ -89,26 +122,20 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
       .getTablePath
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     assert(!FileFactory.isFileExist(trashFolderPath))
-    // All 4 segments are made as stale segments and should be moved to trash
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "4")
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     checkAnswer(sql(s"""select count(*) from cleantest"""),
-      Seq(Row(0)))
+      Seq(Row(4)))
     count = 0
     var list = getFileCountInTrashFolder(trashFolderPath)
-    assert(list == 4)
+    assert(list == 2)
     val timeStamp = getTimestampFolderName(trashFolderPath)
     // recovering data from trash folder
-    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
-      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + "0.1"
     val segment4Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
       CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '4'
 
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
-
     sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment4Path'")
     sql("INSERT INTO cleantest select * from c1").show()
     sql("drop table c1")
@@ -120,7 +147,7 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     count = 0
     list = getFileCountInTrashFolder(trashFolderPath)
-    assert(list == 4)
+    assert(list == 2)
 
     intercept[RuntimeException] {
       sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
@@ -151,34 +178,31 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
 
     val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext
       .sparkSession).getTablePath
-    deleteTableStatusFile(mainTablePath)
+    // deleteTableStatusFile(mainTablePath)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "1")
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "2")
     val mainTableTrashFolderPath = CarbonTablePath.getTrashFolderPath(mainTablePath)
 
     assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
     sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
-    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(2)))
     checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
 
     assert(FileFactory.isFileExist(mainTableTrashFolderPath))
 
     count = 0
     var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
-    assert(listMainTable == 8)
+    assert(listMainTable == 4)
 
     // recovering data from trash folder
     val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
-    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
-      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
     val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
       timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
     val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
       timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
-    val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
-      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3'
 
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
 
     sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
     sql("INSERT INTO cleantest select * from c1").show()
@@ -188,10 +212,6 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     sql("INSERT INTO cleantest select * from c1").show()
     sql("drop table c1")
 
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
-
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
     intercept[RuntimeException] {
@@ -212,14 +232,15 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
 
   test("test trash folder with 2 segments with same segment number") {
     createTable()
-    sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
+    loadData()
 
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     assert(!FileFactory.isFileExist(trashFolderPath))
     // All 4  segments are made as stale segments, they should be moved to the trash folder
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "3")
 
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
@@ -228,7 +249,8 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     assert(list == 2)
 
     sql(s"""INSERT INTO CLEANTEST SELECT "1", 2, "name"""")
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "3")
 
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     count = 0
@@ -262,17 +284,17 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
       .getTablePath
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     assert(!FileFactory.isFileExist(trashFolderPath))
-    // All 4 segments are made as stale segments and should be moved to trash
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "1")
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "2")
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     checkAnswer(sql(s"""select count(*) from cleantest"""),
-      Seq(Row(0)))
+      Seq(Row(2)))
     count = 0
     var list = getFileCountInTrashFolder(trashFolderPath)
-    assert(list == 8)
-    val timeStamp = getTimestampFolderName(trashFolderPath)
-
+    assert(list == 4)
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     count = 0
     list = getFileCountInTrashFolder(trashFolderPath)
@@ -324,12 +346,6 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     timeStampList.get(0).getName
   }
 
-  def deleteTableStatusFile(carbonTablePath: String) : Unit = {
-    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
-      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
-    f1.delete()
-  }
-
   def createTable() : Unit = {
     sql("""DROP TABLE IF EXISTS CLEANTEST""")
     sql(
@@ -345,5 +361,4 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
     sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
     sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
   }
-
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 8954b12..06ac6f8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -45,7 +45,11 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     assert(!FileFactory.isFileExist(trashFolderPath))
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
     assert(segmentNumber1 == 4)
-    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('stale_inprogress'='true','force'='true')").show
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
     assert(0 == segmentNumber2)
     assert(!FileFactory.isFileExist(trashFolderPath))
@@ -67,7 +71,11 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"""Delete from table cleantest where segment.id in(4)""")
     val segmentNumber1 = sql(s"""show segments for table cleantest""").count()
-    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val segmentNumber2 = sql(s"""show segments for table cleantest""").count()
     assert(segmentNumber1 == segmentNumber2 + 5)
     assert(!FileFactory.isFileExist(trashFolderPath))
@@ -80,13 +88,13 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
 
   test("test trash folder with 2 segments with same segment number") {
     createParitionTable()
-    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
-
+    loadData()
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
     val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath.TRASH_DIR
     assert(!FileFactory.isFileExist(trashFolderPath))
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "3")
 
     assert(!FileFactory.isFileExist(trashFolderPath))
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
@@ -95,7 +103,8 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     assert(list == 2)
 
     sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"hello","abc"""")
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "3")
 
     sql(s"CLEAN FILES FOR TABLE cleantest").show()
     count = 0
@@ -124,20 +133,17 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
-    // All 4  segments are made as stale segments, they should be moved to the trash folder
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+      sqlContext.sparkSession), "1")
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+      sqlContext.sparkSession), "2")
 
     sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
     checkAnswer(sql(s"""select count(*) from cleantest"""),
-      Seq(Row(0)))
+      Seq(Row(2)))
 
     val timeStamp = getTimestampFolderName(trashFolderPath)
     // test recovery from partition table
-    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
-      "/Segment_0"
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
 
     val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
       "/Segment_1"
@@ -151,12 +157,6 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql("INSERT INTO cleantest select * from c1").show()
     sql("drop table c1")
 
-    val segment3Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
-      "/Segment_3"
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
-
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
 
@@ -179,19 +179,15 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
-    // All 4  segments are made as stale segments, they should be moved to the trash folder
-    // createStaleSegments(path)
-    deleteTableStatusFile(path)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+      sqlContext.sparkSession), "1")
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+      sqlContext.sparkSession), "2")
 
     sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
 
     val timeStamp = getTimestampFolderName(trashFolderPath)
     // test recovery from partition table
-    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
-      "/Segment_0"
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
 
     val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR + timeStamp +
       "/Segment_1"
@@ -225,37 +221,32 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
 
     val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext
       .sparkSession).getTablePath
-    deleteTableStatusFile(mainTablePath)
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "1")
+    removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
+        sqlContext.sparkSession), "2")
     val mainTableTrashFolderPath = mainTablePath + CarbonCommonConstants.FILE_SEPARATOR +
       CarbonTablePath.TRASH_DIR
 
     assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
 
     sql(s"CLEAN FILES FOR TABLE CLEANTEST").show()
-    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from cleantest"""), Seq(Row(2)))
     checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(4)))
 
     assert(FileFactory.isFileExist(mainTableTrashFolderPath))
 
     count = 0
     var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
-    assert(listMainTable == 8)
+    assert(listMainTable == 4)
 
     // recovering data from trash folder
     val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
 
-    val segment0Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
-      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '0'
     val segment1Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
       timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '1'
     val segment2Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
       timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '2'
-    val segment3Path = mainTableTrashFolderPath + CarbonCommonConstants.FILE_SEPARATOR +
-      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER + '3'
-
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment0Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
 
     sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment1Path'")
     sql("INSERT INTO cleantest select * from c1").show()
@@ -265,10 +256,6 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql("INSERT INTO cleantest select * from c1").show()
     sql("drop table c1")
 
-    sql(s"CREATE TABLE c1 USING CARBON LOCATION '$segment3Path'")
-    sql("INSERT INTO cleantest select * from c1").show()
-    sql("drop table c1")
-
     checkAnswer(sql(s"""select count(*) from cleantest"""),
       Seq(Row(4)))
     intercept[RuntimeException] {
@@ -329,12 +316,6 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     timeStampList.get(0).getName
   }
 
-  def deleteTableStatusFile(carbonTablePath: String) : Unit = {
-    val f1 = new File(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "Metadata" +
-      CarbonCommonConstants.FILE_SEPARATOR + "tablestatus")  // Original File
-    f1.delete()
-  }
-
   def createParitionTable() : Unit = {
     sql("""DROP TABLE IF EXISTS CLEANTEST""")
     sql(
@@ -350,4 +331,5 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"johnny","adc"""")
     sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
   }
+
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 3af3aec..2ae15dc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -53,9 +53,13 @@ class CompactionSupportGlobalSortFunctionTest
         | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
         | STORED AS carbondata
       """.stripMargin)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
   }
 
   override def afterEach {
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     sql("DROP TABLE IF EXISTS compaction_globalsort")
     sql("DROP TABLE IF EXISTS carbon_localsort")
     resetConf
@@ -255,7 +259,7 @@ class CompactionSupportGlobalSortFunctionTest
 
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
 
@@ -290,7 +294,7 @@ class CompactionSupportGlobalSortFunctionTest
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
 
@@ -324,7 +328,7 @@ class CompactionSupportGlobalSortFunctionTest
 
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
 
@@ -361,7 +365,7 @@ class CompactionSupportGlobalSortFunctionTest
 
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
 
@@ -394,7 +398,7 @@ class CompactionSupportGlobalSortFunctionTest
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
 
     val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
@@ -498,7 +502,7 @@ class CompactionSupportGlobalSortFunctionTest
 
     sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
-    sql("clean files for table compaction_globalsort")
+    sql("clean files for table compaction_globalsort options('force'='true')")
 
     val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
     val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
index 35a8bca..748fdd2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala
@@ -32,10 +32,14 @@ class TableLevelCompactionOptionTest extends QueryTest
   val sampleFilePath: String = resourcesPath + "/sample.csv"
 
   override def beforeEach {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     cleanTable()
   }
 
   override def afterEach {
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     resetConf()
     cleanTable()
   }
@@ -116,7 +120,7 @@ class TableLevelCompactionOptionTest extends QueryTest
     }
 
     sql("ALTER TABLE carbon_table COMPACT 'MAJOR'")
-    sql("CLEAN FILES FOR TABLE carbon_table")
+    sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
 
     val segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
     val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
@@ -181,7 +185,7 @@ class TableLevelCompactionOptionTest extends QueryTest
     for (i <- 0 until 8) {
       sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
     }
-    sql("CLEAN FILES FOR TABLE carbon_table")
+    sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
     var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
     var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
     assert(segmentSequenceIds.size == 1)
@@ -251,7 +255,7 @@ class TableLevelCompactionOptionTest extends QueryTest
     for (i <- 0 until 6) {
       sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
     }
-    sql("CLEAN FILES FOR TABLE carbon_table")
+    sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
     var segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
     var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
     assert(segmentSequenceIds.contains("0.1"))
@@ -271,7 +275,7 @@ class TableLevelCompactionOptionTest extends QueryTest
     for (i <- 0 until 2) {
       sql(s"LOAD DATA LOCAL INPATH '$sampleFilePath' INTO TABLE carbon_table")
     }
-    sql("CLEAN FILES FOR TABLE carbon_table")
+    sql("CLEAN FILES FOR TABLE carbon_table options('force'='true')")
     segments = sql("SHOW SEGMENTS FOR TABLE carbon_table")
     segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
     assert(segmentSequenceIds.contains("0.2"))
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 1361dac..c6306c0 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -29,7 +29,8 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
   // scalastyle:off lineLength
   override def beforeAll {
     dropTable
-
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     sql(
@@ -102,7 +103,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) == 5)
-    sql("clean files for table t1")
+    sql("clean files for table t1 options('force'='true')")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) == 4)
@@ -111,7 +112,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) == 5)
-    sql("clean files for table t1")
+    sql("clean files for table t1 options('force'='true')")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath)
              .listFiles()
              .count(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) == 1)
@@ -137,7 +138,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
     sql("Alter table flatfolder_delete compact 'minor'")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
-    sql("clean files for table flatfolder_delete")
+    sql("clean files for table flatfolder_delete options('force'='true')")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0)
     sql("drop table if exists flatfolder_delete")
@@ -166,7 +167,7 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     sql("Alter table flatfolder_delete compact 'minor'")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
              .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 8)
-    sql("clean files for table flatfolder_delete")
+    sql("clean files for table flatfolder_delete options('force'='true')")
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
              .filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
     assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
@@ -176,6 +177,8 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       .addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 14aa542..b6d530c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -43,6 +43,8 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
         |AS carbondata""".stripMargin)
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/source2.csv' INTO table iud_db.source2""")
     sql("use iud_db")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
   }
 
   test("delete data from carbon table with alias [where clause ]") {
@@ -201,9 +203,9 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert into select_after_clean select 3,'uhj'")
     sql("insert into select_after_clean select 4,'frg'")
     sql("alter table select_after_clean compact 'minor'")
-    sql("clean files for table select_after_clean")
+    sql("clean files for table select_after_clean options('force'='true')")
     sql("delete from select_after_clean where name='def'")
-    sql("clean files for table select_after_clean")
+    sql("clean files for table select_after_clean options('force'='true')")
     assertResult(false)(new File(
       CarbonTablePath.getSegmentPath(s"$storeLocation/iud_db.db/select_after_clean", "0")).exists())
     checkAnswer(sql("""select * from select_after_clean"""),
@@ -469,5 +471,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud_db cascade")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
index 3a280ce..d15d9c6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
@@ -22,14 +22,26 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * Test Class for SHOW SEGMENTS command
  */
 class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll {
 
+  override def beforeAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+  }
+
+  override def afterAll {
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+  }
+
   test("test show segment by query, success case") {
     sql("drop table if exists source")
     sql(
@@ -173,7 +185,7 @@ class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     var historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
     assert(detail.length == 10)
     assert(historyDetail.length == 0)
-    sql(s"clean files for table ${tableName}")
+    sql(s"clean files for table ${tableName} options('force'='true')")
     assert(sql(s"show segments on ${tableName}").collect().length == 2)
     assert(sql(s"show segments on ${tableName} limit 1").collect().length == 1)
     detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
@@ -193,13 +205,19 @@ class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     assert(sql(s"show segments on ${ tableName } as select * from ${ tableName }_segments")
              .collect()
              .length == 10)
+    sql(s"show segments on ${ tableName } as select * from ${ tableName }_segments")
+      .show()
     assert(sql(s"show history segments on ${ tableName } as select * from ${ tableName }_segments")
              .collect()
              .length == 10)
-    sql(s"clean files for table ${tableName}")
+    sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").show()
+    sql(s"clean files for table ${tableName} options('force'='true')")
     assert(sql(s"show segments on ${ tableName } as select * from ${ tableName }_segments")
              .collect()
              .length == 2)
+    sql(s"show segments on ${ tableName } as select * from ${ tableName }_segments")
+      .show()
+    sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").show()
     sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").collect()
     var segmentsHistoryList = sql(s"show history segments on ${ tableName } " +
                                   s"as select * from ${ tableName }_segments")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
index b5cbfb5..1aeea03 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.{CarbonUtils, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * Testcase for set segment in multhread env
@@ -52,6 +54,8 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA
     sql(
       s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE carbon_table_MulTI_THread " +
       "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
   }
 
   test("test multithreading for segment reading") {
@@ -91,5 +95,7 @@ class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterA
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
     CarbonUtils.threadUnset("carbon.input.segments.default.carbon_table_MulTI_THread")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 4d3c0f3..f771440 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -47,6 +47,8 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
       """.stripMargin)
 
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
   }
 
   test("test global sort column as partition column") {
@@ -942,7 +944,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     assert(sql("select * from comp_dt2").collect().length == 4)
     sql("Alter table comp_dt2 compact 'minor'")
     assert(sql("select * from comp_dt2").collect().length == 4)
-    sql("clean files for table comp_dt2")
+    sql("clean files for table comp_dt2 options('force'='true')")
     assert(sql("select * from comp_dt2").collect().length == 4)
     sql("insert into comp_dt2 select 5,'E','2003-01-01',3")
     sql("insert into comp_dt2 select 6,'F','2003-01-01',3")
@@ -950,7 +952,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql("insert into comp_dt2 select 8,'H','2004-01-01',''")
     assert(sql("select * from comp_dt2").collect().length == 8)
     sql("Alter table comp_dt2 compact 'minor'")
-    sql("clean files for table comp_dt2")
+    sql("clean files for table comp_dt2 options('force'='true')")
     assert(sql("select * from comp_dt2").collect().length == 8)
     assert(sql("select * from comp_dt2").collect().length == 8)
     sql("insert into comp_dt2 select 9,'H','2001-01-01',1")
@@ -961,7 +963,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     sql("Alter table comp_dt2 compact 'minor'")
     assert(sql("show segments for table comp_dt2").collect().length == 8)
     assert(sql("select * from comp_dt2").collect().length == 12)
-    sql("clean files for table comp_dt2")
+    sql("clean files for table comp_dt2 options('force'='true')")
     assert(sql("select * from comp_dt2").collect().length == 12)
     sql("insert into comp_dt2 select 13,'L','2004-01-01', 6")
     assert(sql("select * from comp_dt2").collect().length == 13)
@@ -969,7 +971,7 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     assert(sql("select * from comp_dt2").collect().length == 13)
     assert(sql("show segments for table comp_dt2").collect().length == 3)
     assert(sql("select * from comp_dt2").collect().length == 13)
-    sql("clean files for table comp_dt2")
+    sql("clean files for table comp_dt2 options('force'='true')")
     assert(sql("show segments for table comp_dt2").collect().length == 1)
     assert(sql("select * from comp_dt2").collect().length == 13)
   }
@@ -1071,6 +1073,8 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index b9bb98b..cd87bc8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -33,7 +33,8 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
 
   override def beforeAll {
     dropTable
-
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
@@ -195,13 +196,15 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
     sql(s"delete from table partitionalldeleteseg where segment.id in (1)").collect()
     checkExistence(sql(s"show segments for table partitionalldeleteseg"), true, "Marked for Delete")
     checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"), Seq(Row(30)))
-    sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg").collect()
+    sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg options('force'='true')").collect()
     assert(sql(s"show segments for table partitionalldeleteseg").count == 3)
   }
 
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index c59bef9..9d294a5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -30,7 +30,8 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     defaultConfig()
 
     dropTable
-
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     sql(
@@ -166,7 +167,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     for (i <- 0 until 4) {
       sql(s"""insert into staticpartitioncompaction PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
     }
-    sql("CLEAN FILES FOR TABLE staticpartitioncompaction").collect()
+    sql("CLEAN FILES FOR TABLE staticpartitioncompaction options('force'='true')").collect()
     val segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction")
     val segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
     assert(segmentSequenceIds.size==1)
@@ -210,6 +211,8 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
+    CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     dropTable
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
index 37c93b9..4c41b3a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/TestPartitionWithMV.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -35,6 +36,8 @@ class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll with BeforeAn
 
   override def beforeAll(): Unit = {
     defaultConfig()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
     sql("drop database if exists partition_mv cascade")
     sql("create database partition_mv")
     sql("use partition_mv")
@@ -54,6 +57,8 @@ class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll with BeforeAn
   override def afterAll(): Unit = {
     sql("drop database if exists partition_mv cascade")
     sql("use default")
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
   }
 
   override def beforeEach(): Unit = {
@@ -259,7 +264,7 @@ class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll with BeforeAn
     sql("insert into droppartition values('k',2,2014,1,1)")
     sql("insert into droppartition values('k',2,2015,2,3)")
     sql("alter table droppartition drop partition(year=2015,month=2,day=3)")
-    sql("clean files for table droppartition")
+    sql("clean files for table droppartition options('force'='true')")
     val table = CarbonEnv.getCarbonTable(Option("partition_mv"), "droppartition")(sqlContext.sparkSession)
     val mvTable = CarbonEnv.getCarbonTable(Option("partition_mv"), "droppartition")(sqlContext.sparkSession)
     val mvtablePath = mvTable.getTablePath
diff --git a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 8012630..4ee1542 100644
--- a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -115,9 +115,14 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
 
   test("clean files") {
     val table = "carbon_table3"
+    dropTable(table)
     createAndLoadTestTable(table, "csv_table")
     DeleteSegmentById.main(Array(s"${location}", table, "0"))
-    CleanFiles.main(Array(s"${location}", table))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    CleanFiles.main(Array(s"${location}", table, "false", "true", "true"))
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
     val tablePath = carbonTable.getAbsoluteTableIdentifier.getTablePath
     val f = new File(s"$tablePath/Fact/Part0")
@@ -132,7 +137,7 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll {
     val table = "carbon_table4"
     dropTable(table)
     createAndLoadTestTable(table, "csv_table")
-    CleanFiles.main(Array(s"${location}", table, "true"))
+    CleanFiles.main(Array(s"${location}", table, "true", "false", "false"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table)
     val tablePath = carbonTable.getTablePath
     val f = new File(tablePath)


Mime
View raw message