carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4081] Fix multiple issues with clean files command
Date Thu, 17 Dec 2020 11:27:02 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aafb6b  [CARBONDATA-4081] Fix multiple issues with clean files command
7aafb6b is described below

commit 7aafb6b769b175cb3ca9ed6fbd7e67b2b55b52b5
Author: Vikram Ahuja <vikramahuja8803@gmail.com>
AuthorDate: Thu Dec 10 14:38:17 2020 +0530

    [CARBONDATA-4081] Fix multiple issues with clean files command
    
    Why is this PR needed?
    While getting stale segment, we do a list files and take all the files, if there is any
folder/file other than .segment file, it will lead to further issues while copying data to
the trash folder.
    
    In the case when AbstractDFSCarbonFile is created with path and HadoopConf instead of
fileStatus and if the file does not exist, since fileStatus is empty ListDirs returns empty
result and getAbsolutePath throws file does not exist exception
    
    Clean files is not allowed with concurrent insert overwrite in progress, In case with
concurrent loading to MV is by default an insert overwrite operation, clean files operation
on that MV would fail and throw exception.
    
    What changes were proposed in this PR?
    Added a filter, to only consider the files ending with ".segment"
    Using listfiles instead of list dirs in the trash folder
    No need to throw exceptions, just put a info message in case of such MV tables and continue
blocking clean files for such MV.
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4051
---
 .../apache/carbondata/core/util/CleanFilesUtil.java   |  3 ++-
 .../org/apache/carbondata/core/util/TrashUtil.java    | 19 +++++++++++--------
 docs/clean-files.md                                   |  3 +++
 .../command/management/CarbonCleanFilesCommand.scala  | 11 ++++++++++-
 .../TestCleanFilesCommandPartitionTable.scala         | 11 +++++++++++
 5 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index 3a817a0..9889fe3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -154,7 +154,8 @@ public class CleanFilesUtil {
     String segmentFilesLocation =
         CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath());
     List<String> segmentFiles = Arrays.stream(FileFactory.getCarbonFile(segmentFilesLocation)
-        .listFiles()).map(CarbonFile::getName).collect(Collectors.toList());
+        .listFiles()).map(CarbonFile::getName).filter(segmentFileName -> segmentFileName
+        .endsWith(CarbonTablePath.SEGMENT_EXT)).collect(Collectors.toList());
     // there are no segments present in the Metadata folder. Can return here
     if (segmentFiles.size() == 0) {
       return;
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index c9db279..ae5476d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -111,7 +111,7 @@ public final class TrashUtil {
         LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " has been copied to" +
             " the trash folder successfully. Total files copied: " + dataFiles.length);
       } else {
-        LOGGER.info("Segment: " + segmentPath.getAbsolutePath() + " does not exist");
+        LOGGER.info("Segment: " + segmentPath.getName() + " does not exist");
       }
     } catch (IOException e) {
       LOGGER.error("Error while copying the segment: " + segmentPath.getName() + " to the
trash" +
@@ -153,15 +153,17 @@ public final class TrashUtil {
    * per the user defined retention time
    */
   public static void deleteExpiredDataFromTrash(String tablePath) {
-    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+        .getTrashFolderPath(tablePath));
     // Deleting the timestamp based subdirectories in the trashfolder by the given timestamp.
     try {
-      if (FileFactory.isFileExist(trashPath)) {
-        List<CarbonFile> timestampFolderList = FileFactory.getFolderList(trashPath);
+      if (trashFolder.isFileExist()) {
+        CarbonFile[] timestampFolderList = trashFolder.listFiles();
         for (CarbonFile timestampFolder : timestampFolderList) {
           // If the timeStamp at which the timeStamp subdirectory has expired as per the
user
           // defined value, delete the complete timeStamp subdirectory
-          if (isTrashRetentionTimeoutExceeded(Long.parseLong(timestampFolder.getName())))
{
+          if (timestampFolder.isDirectory() && isTrashRetentionTimeoutExceeded(Long
+              .parseLong(timestampFolder.getName()))) {
             FileFactory.deleteAllCarbonFilesOfDir(timestampFolder);
             LOGGER.info("Timestamp subfolder from the Trash folder deleted: " + timestampFolder
                 .getAbsolutePath());
@@ -177,11 +179,12 @@ public final class TrashUtil {
    * The below method deletes all the files and folders in the trash folder of a carbon table.
    */
   public static void emptyTrash(String tablePath) {
-    String trashPath = CarbonTablePath.getTrashFolderPath(tablePath);
+    CarbonFile trashFolder = FileFactory.getCarbonFile(CarbonTablePath
+        .getTrashFolderPath(tablePath));
     // if the trash folder exists delete the contents of the trash folder
     try {
-      if (FileFactory.isFileExist(trashPath)) {
-        List<CarbonFile> carbonFileList = FileFactory.getFolderList(trashPath);
+      if (trashFolder.isFileExist()) {
+        CarbonFile[] carbonFileList = trashFolder.listFiles();
         for (CarbonFile carbonFile : carbonFileList) {
           FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
         }
diff --git a/docs/clean-files.md b/docs/clean-files.md
index baf81db..dbe611b 100644
--- a/docs/clean-files.md
+++ b/docs/clean-files.md
@@ -26,6 +26,9 @@ Clean files command is used to remove the Compacted, Marked For Delete ,In
Progr
    ```
 The above clean files command will clean Marked For Delete and Compacted segments depending
on ```max.query.execution.time``` (default 1 hr) and ``` carbon.trash.retention.days``` (default
7 days). It will also delete the timestamp subdirectories from the trash folder after expiration
day(default 7 day, can be configured)
 
+**NOTE**:
+  * Clean files operation not supported on non transactional tables.
+  * Clean files operation not supported on tables with concurrent insert overwrite operation.
 
 ### TRASH FOLDER
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 3f956e2..c51efef 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events._
@@ -38,13 +40,20 @@ case class CarbonCleanFilesCommand(
     isInternalCleanCall: Boolean = false)
   extends DataCommand {
 
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     setAuditTable(carbonTable)
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
-      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      if ((carbonTable.isMV && !isInternalCleanCall) || !carbonTable.isMV) {
+        throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+      } else if (carbonTable.isMV && isInternalCleanCall) {
+        LOGGER.info(s"Clean files not allowed on table: {${carbonTable.getTableName}}")
+        return Seq.empty
+      }
     }
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional
table")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
index 06ac6f8..c494404 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFilesCommandPartitionTable.scala
@@ -132,6 +132,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     loadData()
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
+    addRandomFiles(path)
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
       sqlContext.sparkSession), "1")
@@ -178,6 +179,7 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
 
     val path = CarbonEnv.getCarbonTable(Some("default"), "cleantest")(sqlContext.sparkSession)
       .getTablePath
+    addRandomFiles(path)
     val trashFolderPath = CarbonTablePath.getTrashFolderPath(path)
     removeSegmentEntryFromTableStatusFile(CarbonEnv.getCarbonTable(Some("default"), "cleantest")(
       sqlContext.sparkSession), "1")
@@ -332,4 +334,13 @@ class TestCleanFilesCommandPartitionTable extends QueryTest with BeforeAndAfterA
     sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"Reddit","adc"""")
   }
 
+  def addRandomFiles(carbonTablePath: String) : Unit = {
+    val f1 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR  + "_.tmp"
+    val f2 = CarbonTablePath.getSegmentFilesLocation(carbonTablePath) +
+      CarbonCommonConstants.FILE_SEPARATOR  + "1_.tmp"
+      FileFactory.createNewFile(f1)
+      FileFactory.createNewFile(f2)
+  }
+
 }


Mime
View raw message