carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [15/50] [abbrv] carbondata git commit: [CARBONDATA-1486] Fixed issue of table status updation on insert overwrite failure and exception thrown while deletion of stale folders
Date Tue, 10 Oct 2017 03:08:02 GMT
[CARBONDATA-1486] Fixed issue of table status updation on insert overwrite failure and exception
thrown while deletion of stale folders

This closes #1368


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c4f312fa
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c4f312fa
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c4f312fa

Branch: refs/heads/streaming_ingest
Commit: c4f312fa1dda5774fac715aa96fde3094f42a592
Parents: d304ef9
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Mon Sep 18 18:03:04 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Sep 28 01:47:56 2017 +0800

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 23 +++++++++++++++++--
 .../load/DataLoadProcessorStepOnSpark.scala     |  2 +-
 .../carbondata/spark/util/CommonUtil.scala      | 24 ++++++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  9 +++++---
 .../newflow/sort/SortStepRowUtil.java           |  3 +--
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  2 +-
 6 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 7c2d157..9fe003f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -289,9 +289,18 @@ public final class CarbonLoaderUtil {
           newMetaEntry.setLoadName(segmentId);
           loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
           loadModel.setSegmentId(segmentId);
+          // Exception should be thrown if:
+          // 1. If insert overwrite is in progress and any other load or insert operation
+          // is triggered
+          // 2. If load or insert into operation is in progress and insert overwrite operation
+          // is triggered
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage()))
{
               throw new RuntimeException("Already insert overwrite is in progress");
+            } else if (
+                newMetaEntry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())
+                    && entry.getLoadStatus().equals(LoadStatusType.IN_PROGRESS.getMessage()))
{
+              throw new RuntimeException("Already insert into or load is in progress");
             }
           }
           listOfLoadFolderDetails.add(newMetaEntry);
@@ -318,7 +327,11 @@ public final class CarbonLoaderUtil {
                 // For insert overwrite, we will delete the old segment folder immediately
                 // So collect the old segments here
                 String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
-                staleFolders.add(FileFactory.getCarbonFile(path));
+                // add to the deletion list only if file exist else HDFS file system will
throw
+                // exception while deleting the file if file path does not exist
+                if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+                  staleFolders.add(FileFactory.getCarbonFile(path));
+                }
               }
             }
           }
@@ -328,7 +341,13 @@ public final class CarbonLoaderUtil {
             .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
         // Delete all old stale segment folders
         for (CarbonFile staleFolder : staleFolders) {
-          CarbonUtil.deleteFoldersAndFiles(staleFolder);
+          // try block is inside for loop because even if there is failure in deletion of
1 stale
+          // folder still remaining stale folders should be deleted
+          try {
+            CarbonUtil.deleteFoldersAndFiles(staleFolder);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Failed to delete stale folder: " + e.getMessage());
+          }
         }
         status = true;
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index aaf7926..6943dcb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -136,7 +136,7 @@ object DataLoadProcessorStepOnSpark {
 
       override def next(): CarbonRow = {
         val row =
-          new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters, true))
+          new CarbonRow(SortStepRowUtil.convertRow(rows.next().getData, sortParameters))
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index f123624..5040e69 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -537,6 +537,30 @@ object CommonUtil {
     }
   }
 
+  /**
+   * This method will update the load failure entry in the table status file
+   *
+   * @param model
+   */
+  def updateTableStatusForFailure(
+      model: CarbonLoadModel): Unit = {
+    // in case if failure the load status should be "Marked for delete" so that it will be
taken
+    // care during clean up
+    val loadStatus = CarbonCommonConstants.MARKED_FOR_DELETE
+    // always the last entry in the load metadata details will be the current load entry
+    val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size
- 1)
+    CarbonLoaderUtil
+      .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
+    val updationStatus = CarbonLoaderUtil.recordLoadMetadata(loadMetaEntry, model, false,
false)
+    if (!updationStatus) {
+      sys
+        .error(s"Failed to update failure entry in table status for ${
+          model
+            .getDatabaseName
+        }.${ model.getTableName }")
+    }
+  }
+
   def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
     val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0edfccf..3d67a79 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1069,13 +1069,14 @@ object CarbonDataRDDFactory {
         return
       }
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+        // update the load entry in table status file for changing the status to failure
+        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
-        updateStatus(status, loadStatus)
         throw new Exception(errorMessage)
       } else {
         // check if data load fails due to bad record and throw data load failure due to
@@ -1083,16 +1084,19 @@ object CarbonDataRDDFactory {
         if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
             status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
             carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name)
{
+          // update the load entry in table status file for changing the status to failure
+          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.info("********starting clean up**********")
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
-          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception(status(0)._2._2.errorMsg)
         }
         // if segment is empty then fail the data load
         if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt))
{
+          // update the load entry in table status file for changing the status to failure
+          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.info("********starting clean up**********")
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
           LOGGER.info("********clean up done**********")
@@ -1100,7 +1104,6 @@ object CarbonDataRDDFactory {
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}" +
                        " as there is no data to load")
           LOGGER.warn("Cannot write load metadata file as data load failed")
-          updateStatus(status, CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           throw new Exception("No Data to load")
         }
         writeDictionary(carbonLoadModel, result, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 5238c3c..62434bc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -21,8 +21,7 @@ import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 
 public class SortStepRowUtil {
-  public static Object[] convertRow(Object[] data, SortParameters parameters,
-      boolean needConvertDecimalToByte) {
+  public static Object[] convertRow(Object[] data, SortParameters parameters) {
     int measureCount = parameters.getMeasureColCount();
     int dimensionCount = parameters.getDimColCount();
     int complexDimensionCount = parameters.getComplexDimColCount();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c4f312fa/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 1455365..e3bbdcb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -183,7 +183,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    * @return sorted row
    */
   public Object[] next() {
-    return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters, false);
+    return SortStepRowUtil.convertRow(getSortedRecordFromFile(), parameters);
   }
 
   /**


Mime
View raw message