carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3667] Insert stage recover processing of the partition table throw exception "the unexpected 0 segment found"
Date Tue, 28 Jan 2020 09:00:48 GMT
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b332d90  [CARBONDATA-3667] Insert stage recover processing of the partition table
throw exception "the unexpected 0 segment found"
b332d90 is described below

commit b332d908846a31aac7ee9fe196802dcfe4346b31
Author: h00424960 <haoxingjun@huawei.com>
AuthorDate: Sun Jan 26 21:29:11 2020 +0800

    [CARBONDATA-3667] Insert stage recover processing of the partition table throw exception
"the unexpected 0 segment found"
    
    Why is this PR needed?
      Recover processing when executing insert stage a partition table throw exception "the
unexpected 0 segment found"
      The reason is the snapshot content of partition table is wrong.
    
    What changes were proposed in this PR?
      Let's review the recover processing's purpose, when the segment is loaded successfully,
but fail to clean the stage files, the stage(data) will be loaded again which leads to repetition
of data. Before, we expect to save a snapshot file consists of segment id and stage file list,
the stage files failed to clean can be clean again when we found the corresponding segment
has been loaded successfully. But the segmentid can't be achieve while loading partition table,
in the other words, [...]
      After discussion, we believe that this problem can also be alleviated by adding retry
mechanism when deleting stage files. Thus, there are two changes were proposed which is shown
below.
      1) Remove recover processing of Insert stage command.
      2) Add Retry to delete stage files.
    
    Does this PR introduce any user interface change?
      No
    
    Is any new testcase added?
      No
    
    This closes #3589
---
 .../management/CarbonInsertFromStageCommand.scala  | 96 +++++++++++++++++-----
 1 file changed, 77 insertions(+), 19 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0f5c4ae..fe3175a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.management
 import java.io.{InputStreamReader, IOException}
 import java.util
 import java.util.Collections
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent.{Callable, Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
 
@@ -62,6 +62,8 @@ case class CarbonInsertFromStageCommand(
 
   @transient var LOGGER: Logger = _
 
+  val DELETE_FILES_RETRY_TIMES = 3
+
   override def processData(spark: SparkSession): Seq[Row] = {
     LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     Checker.validateTableExists(databaseNameOp, tableName, spark)
@@ -144,10 +146,10 @@ case class CarbonInsertFromStageCommand(
       }
 
       // 4) delete stage files
-      deleteStageFiles(executorService, stageFiles)
+      deleteStageFilesWithRetry(executorService, stageFiles)
 
       // 5) delete the snapshot file
-      FileFactory.getCarbonFile(snapshotFilePath).delete()
+      deleteSnapShotFileWithRetry(table, snapshotFilePath)
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}",
ex)
@@ -165,7 +167,7 @@ case class CarbonInsertFromStageCommand(
       snapshotFilePath: String,
       table: CarbonTable,
       conf: Configuration): Unit = {
-    if (!FileFactory.isFileExist(snapshotFilePath)) {
+    if (!FileFactory.isFileExist(snapshotFilePath) || table.isHivePartitionTable) {
       // everything is fine
       return
     }
@@ -189,7 +191,7 @@ case class CarbonInsertFromStageCommand(
       table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
     if (!lock.lockWithRetries()) {
       throw new RuntimeException(s"Failed to lock table status for " +
-                                 s"${table.getDatabaseName}.${table.getTableName}")
+        s"${table.getDatabaseName}.${table.getTableName}")
     }
     try {
       val segments = SegmentStatusManager.readTableStatusFile(
@@ -205,7 +207,7 @@ case class CarbonInsertFromStageCommand(
           lock.unlock()
           lock = null
           LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to delete " +
-                      s"${stageFileNames.length} stage files")
+            s"${stageFileNames.length} stage files")
           val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
           val executorService = Executors.newFixedThreadPool(numThreads)
           stageFileNames.map { fileName =>
@@ -213,7 +215,7 @@ case class CarbonInsertFromStageCommand(
               override def run(): Unit = {
                 FileFactory.getCarbonFile(
                   CarbonTablePath.getStageDir(table.getTablePath) +
-                  CarbonCommonConstants.FILE_SEPARATOR + fileName
+                    CarbonCommonConstants.FILE_SEPARATOR + fileName
                 ).delete()
               }
             })
@@ -223,7 +225,7 @@ case class CarbonInsertFromStageCommand(
         case other =>
           // delete entry in table status and load again
           LOGGER.warn(s"Segment $segmentId is in $other state, about to delete the " +
-                      s"segment entry and load again")
+            s"segment entry and load again")
           val segmentToWrite = segments.filterNot(_.getLoadName.equals(segmentId))
           SegmentStatusManager.writeLoadDetailsIntoFile(
             CarbonTablePath.getTableStatusFilePath(table.getTablePath),
@@ -307,9 +309,6 @@ case class CarbonInsertFromStageCommand(
     ): Unit = {
     val partitionDataList = listPartitionFiles(stageInput)
 
-    val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
-    FileFactory.writeFile(content, snapshotFilePath)
-
     val start = System.currentTimeMillis()
     partitionDataList.map {
       case (partition, splits) =>
@@ -414,23 +413,82 @@ case class CarbonInsertFromStageCommand(
 
   /**
    * Delete stage file and success file
+   * Return false means the stage files were cleaned successfully
+   * While return true means the stage files were failed to clean
    */
   private def deleteStageFiles(
       executorService: ExecutorService,
-      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
-    val startTime = System.currentTimeMillis()
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, CarbonFile)] = {
     stageFiles.map { files =>
-      executorService.submit(new Runnable {
-        override def run(): Unit = {
-          files._1.delete()
-          files._2.delete()
+      executorService.submit(new Callable[Boolean] {
+        override def call(): Boolean = {
+          // If delete() return false, maybe the reason is FileNotFount or FileFailedClean.
+          // Considering FileNotFound means FileCleanSucessfully.
+          // We need double check the file exists or not when delete() return false.
+          if (!(files._1.delete() && files._2.delete())) {
+            // If the file still exists, return ture, let the file filtered in.
+            // So we can retry to delete this file.
+            return files._1.exists() || files._1.exists()
+          }
+          // When delete successfully, return false, let the file filtered away.
+          false
         }
       })
-    }.map { future =>
+    }.filter { future =>
       future.get()
     }
+    stageFiles
+  }
+
+  /**
+   * Delete stage file and success file with retry
+   */
+  private def deleteStageFilesWithRetry(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    var retry = DELETE_FILES_RETRY_TIMES
+    while (deleteStageFiles(executorService, stageFiles).length > 0 && retry >
0) {
+      retry -= 1
+    }
     LOGGER.info(s"finished to delete stage files, time taken: " +
-                s"${System.currentTimeMillis() - startTime}ms")
+      s"${System.currentTimeMillis() - startTime}ms")
+    // if there are still stage files failed to clean, print log.
+    if (stageFiles.length > 0) {
+      LOGGER.warn(s"failed to clean up stage files:" + stageFiles.map(_._1.getName).mkString(","))
+    }
+  }
+
+  /**
+   * Delete snapshot file with retry
+   * Return false means the snapshot file was cleaned successfully
+   * While return true means the snapshot file was failed to clean
+   */
+  private def deleteSnapShotFile(
+      snapshotFilePath: String): Boolean = {
+    val snapshotFile = FileFactory.getCarbonFile(snapshotFilePath)
+    // If delete() return false, maybe the reason is FileNotFount or FileFailedClean.
+    // Considering FileNotFound means FileCleanSucessfully.
+    // We need double check the file exists or not when delete() return false.
+    if (!snapshotFile.delete()) {
+      return snapshotFile.exists()
+    }
+    true
+  }
+
+  /**
+   * Delete snapshot file with retry
+   */
+  private def deleteSnapShotFileWithRetry(
+      table: CarbonTable,
+      snapshotFilePath: String): Unit = {
+    if (table.isHivePartitionTable) {
+      return
+    }
+    var retries = DELETE_FILES_RETRY_TIMES
+    while(deleteSnapShotFile(snapshotFilePath) && retries > 0) {
+      retries -= 1
+    }
   }
 
   /*


Mime
View raw message