carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [39/50] [abbrv] carbondata git commit: [CARBONDATA-1896] Clean files operation improvement
Date Tue, 09 Jan 2018 04:02:07 GMT
[CARBONDATA-1896] Clean files operation improvement

Problem:
(1) When bringing up the session, clean operation is handled in a way to
mark all the INSERT_OVERWRITE_IN_PROGRESS or INSERT_IN_PROGRESS segments
to MARKED_FOR_DELETE in tablestatus file. This clean operation is not
considering the other parallel sessions. If any other session's data load
is IN_PROGRESS at the time of bringing up one session, then the executing
load also will be changed to MARKED_FOR_DELETE irrespective of the actual
load status.
(2) Handling stale segments cleaning while session bring up also increases
the time of bringing up a session.

Solution:
(1) SEGMENT_LOCK should be taken on the new segment while loading to
filter out stale IN_PROGRESS segments.
(2) While cleaning segments both tablestatus file and SEGMENT_LOCK should
be considered.
Cleaning stale files while bringing up the session should be removed and
this can be either manually done on the needed tables through already
existing CLEAN FILES DDL or the next load on the table will clean the
same.

This closes #1702


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c8031be
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c8031be
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c8031be

Branch: refs/heads/branch-1.3
Commit: 3c8031beabae5dbd0f909269f1c2b9be583a1a96
Parents: 2dbe6c9
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Wed Dec 20 22:35:31 2017 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Jan 5 16:12:59 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/util/CommonUtil.scala      |  52 ++----
 .../carbondata/spark/util/DataLoadingUtil.scala |  18 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++++++--------
 .../org/apache/spark/sql/CarbonSession.scala    |  12 --
 .../management/CarbonCleanFilesCommand.scala    |  16 +-
 .../management/CarbonLoadDataCommand.scala      |   6 +-
 .../processing/util/DeleteLoadFolders.java      | 164 ++++++++++++-------
 7 files changed, 231 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index e31f838..3f1f305 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -38,24 +38,22 @@ import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
 
@@ -834,7 +832,7 @@ object CommonUtil {
   }
 
   /**
-   * The in-progress segments which are left when the driver is down will be marked as deleted
+   * The in-progress segments which are in stale state will be marked as deleted
    * when driver is initializing.
    * @param databaseLocation
    * @param dbName
@@ -864,43 +862,19 @@ object CommonUtil {
                   val segmentStatusManager = new SegmentStatusManager(identifier)
                   val carbonLock = segmentStatusManager.getTableStatusLock
                   try {
-                    if (carbonLock.lockWithRetries) {
-                      LOGGER.info("Acquired lock for table" +
-                        identifier.getCarbonTableIdentifier.getTableUniqueName
-                        + " for table status updation")
-                      val listOfLoadFolderDetailsArray =
-                        SegmentStatusManager.readLoadMetadata(
-                          carbonTablePath.getMetadataDirectoryPath)
-                      var loadInprogressExist = false
-                      val staleFolders: Seq[CarbonFile] = Seq()
-                      listOfLoadFolderDetailsArray.foreach { load =>
-                        if (load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
-                            load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
{
-                          load.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
-                          staleFolders :+ FileFactory.getCarbonFile(
-                            carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName))
-                          loadInprogressExist = true
-                        }
-                      }
-                      if (loadInprogressExist) {
-                        SegmentStatusManager
-                          .writeLoadDetailsIntoFile(tableStatusFile, listOfLoadFolderDetailsArray)
-                        staleFolders.foreach(CarbonUtil.deleteFoldersAndFiles(_))
-                      }
-                    }
-                  } finally {
-                    if (carbonLock.unlock) {
-                      LOGGER.info(s"Released table status lock for table " +
-                                  s"${identifier.getCarbonTableIdentifier.getTableUniqueName}")
-                    } else {
-                      LOGGER.error(s"Error while releasing table status lock for table "
+
-                                  s"${identifier.getCarbonTableIdentifier.getTableUniqueName}")
-                    }
-                  }
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
+                  DataLoadingUtil.deleteLoadsAndUpdateMetadata(
+                    isForceDeletion = true, carbonTable)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                                s"${ identifier.getCarbonTableIdentifier.getTableUniqueName
}")
                 }
               }
             }
           }
+        }
       }
     } catch {
       case s: java.io.FileNotFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index faba26d..b04a58e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -17,9 +17,8 @@
 
 package org.apache.carbondata.spark.util
 
-import scala.collection.immutable
+import scala.collection.{immutable, mutable}
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
@@ -332,7 +331,9 @@ object DataLoadingUtil {
     val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
     if (details != null && details.nonEmpty) for (oneRow <- details) {
       if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
-           SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
+           SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
+           SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
+           SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) &&
           oneRow.getVisibility.equalsIgnoreCase("true")) {
         return true
       }
@@ -357,9 +358,12 @@ object DataLoadingUtil {
         DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
           absoluteTableIdentifier,
           isForceDeletion,
-          details
+          details,
+          carbonTable.getMetaDataFilepath
         )
 
+      var updationCompletionStaus = false
+
       if (isUpdationRequired) {
         try {
           // Update load metadate file after cleaning deleted nodes
@@ -386,9 +390,15 @@ object DataLoadingUtil {
             LOGGER.error(errorMsg)
             throw new Exception(errorMsg + " Please try after some time.")
           }
+          updationCompletionStaus = true
         } finally {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
         }
+        if (updationCompletionStaus) {
+          DeleteLoadFolders
+            .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
+              carbonTable.getMetaDataFilepath, isForceDeletion)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 18e9181..c6a1178 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -57,7 +57,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent,
OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
@@ -301,7 +301,6 @@ object CarbonDataRDDFactory {
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
     // Check if any load need to be deleted before loading new data
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
@@ -315,87 +314,92 @@ object CarbonDataRDDFactory {
     val isSortTable = carbonTable.getNumberOfSortColumns > 0
     val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
 
+    val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
+
     try {
-      if (updateModel.isDefined) {
-        res = loadDataFrameForUpdate(
-          sqlContext,
-          dataFrame,
-          carbonLoadModel,
-          updateModel,
-          carbonTable)
-        res.foreach { resultOfSeg =>
-          resultOfSeg.foreach { resultOfBlock =>
-            if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
-              loadStatus = SegmentStatus.LOAD_FAILURE
-              if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
-                updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-                updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
-              } else {
-                updateModel.get.executorErrors = resultOfBlock._2._2
+      if (segmentLock.lockWithRetries()) {
+        if (updateModel.isDefined) {
+          res = loadDataFrameForUpdate(
+            sqlContext,
+            dataFrame,
+            carbonLoadModel,
+            updateModel,
+            carbonTable)
+          res.foreach { resultOfSeg =>
+            resultOfSeg.foreach { resultOfBlock =>
+              if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
+                loadStatus = SegmentStatus.LOAD_FAILURE
+                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
+                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
+                } else {
+                  updateModel.get.executorErrors = resultOfBlock._2._2
+                }
+              } else if (resultOfBlock._2._1.getSegmentStatus ==
+                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
               }
-            } else if (resultOfBlock._2._1.getSegmentStatus ==
-                       SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-              loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
-              updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-              updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
             }
           }
-        }
-      } else {
-        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
-          loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
-        } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT))
{
-          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
-            dataFrame, carbonLoadModel, hadoopConf)
-        } else if (dataFrame.isDefined) {
-          loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
         } else {
-          loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
-        }
-        CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
-        val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
-        if (status.nonEmpty) {
-          status.foreach { eachLoadStatus =>
-            val state = newStatusMap.get(eachLoadStatus._1)
-            state match {
-              case Some(SegmentStatus.LOAD_FAILURE) =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
-              case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
-                if eachLoadStatus._2._1.getSegmentStatus ==
-                   SegmentStatus.SUCCESS =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
-              case _ =>
-                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
-            }
+          status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
+            loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
+          } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT))
{
+            DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
+              dataFrame, carbonLoadModel, hadoopConf)
+          } else if (dataFrame.isDefined) {
+            loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
+          } else {
+            loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
           }
-
-          newStatusMap.foreach {
-            case (key, value) =>
-              if (value == SegmentStatus.LOAD_FAILURE) {
-                loadStatus = SegmentStatus.LOAD_FAILURE
-              } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
-                         loadStatus!= SegmentStatus.LOAD_FAILURE) {
-                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
+          val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
+          if (status.nonEmpty) {
+            status.foreach { eachLoadStatus =>
+              val state = newStatusMap.get(eachLoadStatus._1)
+              state match {
+                case Some(SegmentStatus.LOAD_FAILURE) =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
+                case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+                  if eachLoadStatus._2._1.getSegmentStatus ==
+                     SegmentStatus.SUCCESS =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
+                case _ =>
+                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
               }
-          }
-        } else {
-          // if no value is there in data load, make load status Success
-          // and data load flow executes
-          if (dataFrame.isDefined && updateModel.isEmpty) {
-            val rdd = dataFrame.get.rdd
-            if (rdd.partitions == null || rdd.partitions.length == 0) {
-              LOGGER.warn("DataLoading finished. No data was loaded.")
-              loadStatus = SegmentStatus.SUCCESS
+            }
+
+            newStatusMap.foreach {
+              case (key, value) =>
+                if (value == SegmentStatus.LOAD_FAILURE) {
+                  loadStatus = SegmentStatus.LOAD_FAILURE
+                } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+                           loadStatus != SegmentStatus.LOAD_FAILURE) {
+                  loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
+                }
             }
           } else {
-            loadStatus = SegmentStatus.LOAD_FAILURE
+            // if no value is there in data load, make load status Success
+            // and data load flow executes
+            if (dataFrame.isDefined && updateModel.isEmpty) {
+              val rdd = dataFrame.get.rdd
+              if (rdd.partitions == null || rdd.partitions.length == 0) {
+                LOGGER.warn("DataLoading finished. No data was loaded.")
+                loadStatus = SegmentStatus.SUCCESS
+              }
+            } else {
+              loadStatus = SegmentStatus.LOAD_FAILURE
+            }
           }
-        }
 
-        if (loadStatus != SegmentStatus.LOAD_FAILURE &&
-            partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-          loadStatus = partitionStatus
+          if (loadStatus != SegmentStatus.LOAD_FAILURE &&
+              partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+            loadStatus = partitionStatus
+          }
         }
       }
     } catch {
@@ -420,6 +424,8 @@ object CarbonDataRDDFactory {
         }
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
+    } finally {
+      segmentLock.unlock()
     }
     // handle the status file updation for the update cmd.
     if (updateModel.isDefined) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index c9b134e..b4e11c1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -181,18 +181,6 @@ object CarbonSession {
         }
         options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v)
}
         SparkSession.setDefaultSession(session)
-        try {
-          val databases = session.sessionState.catalog.listDatabases()
-          databases.foreach(dbName => {
-            val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, session)
-            CommonUtil.cleanInProgressSegments(databaseLocation, dbName)
-          })
-        } catch {
-          case e: Throwable =>
-            // catch all exceptions to avoid CarbonSession initialization failure
-          LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-            .error(e, "Failed to clean in progress segments")
-        }
         // Register a successfully instantiated context to the singleton. This should be
at the
         // end of the class definition so that the singleton is updated only if there is
no
         // exception in the construction of the instance.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 342acd4..303c3ef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -24,8 +24,10 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext,
OperationListenerBus}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Clean data in table
@@ -96,7 +98,19 @@ case class CarbonCleanFilesCommand(
       partitions)
   }
 
+  // Clean garbage data in all tables in all databases
   private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
-    // Waiting to implement
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        CommonUtil.cleanInProgressSegments(databaseLocation, dbName)
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error(e, "Failed to clean in progress segments")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 46fe24f..4c1e748 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -61,7 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.format
@@ -179,6 +179,8 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
+        // Clean up the old invalid segment data before creating a new entry for new load.
+        DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
@@ -481,8 +483,6 @@ case class CarbonLoadDataCommand(
           "tableMeta",
           c).asInstanceOf[CatalogTable]
     }.head
-    // Clean up the old invalid segment data.
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
     val currentPartitions =
       CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
     // Clean up the alreday dropped partitioned data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c8031be/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 845f629..02ab1d8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -24,10 +24,14 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -55,63 +59,66 @@ public final class DeleteLoadFolders {
     return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
   }
 
-  private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
-
-    boolean status = false;
-    try {
-      if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-        CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-        CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (CarbonTablePath.isCarbonDataFile(file.getName())
-                || CarbonTablePath.isCarbonIndexFile(file.getName())
-                || CarbonTablePath.isPartitionMapFile(file.getName()));
-          }
-        });
-
-        //if there are no fact and msr metadata files present then no need to keep
-        //entry in metadata.
-        if (filesToBeDeleted.length == 0) {
-          status = true;
-        } else {
-
-          for (CarbonFile eachFile : filesToBeDeleted) {
-            if (!eachFile.delete()) {
-              LOGGER.warn("Unable to delete the file as per delete command "
-                  + eachFile.getAbsolutePath());
-              status = false;
-            } else {
+  public static void physicalFactAndMeasureMetadataDeletion(
+      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete)
{
+    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+    for (LoadMetadataDetails oneLoad : currentDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
+        boolean status = false;
+        try {
+          if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+            CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+            CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+              @Override public boolean accept(CarbonFile file) {
+                return (CarbonTablePath.isCarbonDataFile(file.getName())
+                    || CarbonTablePath.isCarbonIndexFile(file.getName())
+                    || CarbonTablePath.isPartitionMapFile(file.getName()));
+              }
+            });
+
+            //if there are no fact and msr metadata files present then no need to keep
+            //entry in metadata.
+            if (filesToBeDeleted.length == 0) {
               status = true;
+            } else {
+
+              for (CarbonFile eachFile : filesToBeDeleted) {
+                if (!eachFile.delete()) {
+                  LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                      .getAbsolutePath());
+                  status = false;
+                } else {
+                  status = true;
+                }
+              }
             }
+            // need to delete the complete folder.
+            if (status) {
+              if (!file.delete()) {
+                LOGGER.warn(
+                    "Unable to delete the folder as per delete command " + file.getAbsolutePath());
+              }
+            }
+
+          } else {
+            LOGGER.warn("Files are not found in segment " + path
+                + " it seems, files are already being deleted");
           }
+        } catch (IOException e) {
+          LOGGER.warn("Unable to delete the file as per delete command " + path);
         }
-        // need to delete the complete folder.
-        if (status) {
-          if (!file.delete()) {
-            LOGGER.warn("Unable to delete the folder as per delete command "
-                + file.getAbsolutePath());
-            status = false;
-          }
-        }
-
-      } else {
-        LOGGER.warn("Files are not found in segment " + path
-            + " it seems, files are already being deleted");
-        status = true;
       }
-    } catch (IOException e) {
-      LOGGER.warn("Unable to delete the file as per delete command " + path);
     }
-
-    return status;
-
   }
 
   private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
       boolean isForceDelete) {
     if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())
+        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
+        SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
+        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
         && oneLoad.getVisibility().equalsIgnoreCase("true")) {
       if (isForceDelete) {
         return true;
@@ -125,27 +132,72 @@ public final class DeleteLoadFolders {
     return false;
   }
 
+  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
+      boolean isForceDelete) {
+    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
+        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
+      if (isForceDelete) {
+        return true;
+      }
+      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+    }
+
+    return false;
+  }
+
+  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
+      String metadataPath) {
+    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+    for (LoadMetadataDetails oneLoad : currentDetails) {
+      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
+        return oneLoad;
+      }
+    }
+    return null;
+  }
+
   public static boolean deleteLoadFoldersFromFileSystem(
       AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
-      LoadMetadataDetails[] details) {
+      LoadMetadataDetails[] details, String metadataPath) {
     boolean isDeleted = false;
-
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
         if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
-          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
-          if (deletionStatus) {
-            isDeleted = true;
-            oneLoad.setVisibility("false");
-            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+          try {
+            if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+                || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+              if (segmentLock.lockWithRetries(1, 5)) {
+                LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
+                LoadMetadataDetails currentDetails =
+                    getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
+                if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
+                    isForceDelete)) {
+                  oneLoad.setVisibility("false");
+                  isDeleted = true;
+                  LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+                }
+              } else {
+                LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
+                return isDeleted;
+              }
+            } else {
+              oneLoad.setVisibility("false");
+              isDeleted = true;
+              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+            }
+          } finally {
+            segmentLock.unlock();
+            LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
           }
         }
       }
     }
-
     return isDeleted;
   }
 
-
 }


Mime
View raw message