carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1789][CARBONDATA-1791]do not drop the table if the load, insert or insert overwrite is in progress
Date Thu, 07 Dec 2017 14:22:40 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 30457c415 -> 99ec112f2


[CARBONDATA-1789][CARBONDATA-1791]do not drop the table if the load, insert or insert overwrite
is in progress

This closes #1610


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/99ec112f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/99ec112f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/99ec112f

Branch: refs/heads/master
Commit: 99ec112f2e2285d2afb1fd1bcbe4682c3cac9979
Parents: 30457c4
Author: akashrn5 <akashnilugal@gmail.com>
Authored: Mon Dec 4 20:36:15 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Dec 7 19:52:24 2017 +0530

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 23 ++++++
 .../TestLoadTableConcurrentScenario.scala       | 78 ++++++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  6 +-
 .../management/CarbonLoadDataCommand.scala      |  7 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |  8 +-
 .../command/table/CarbonDropTableCommand.scala  |  8 +-
 .../processing/util/CarbonLoaderUtil.java       |  6 +-
 7 files changed, 128 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6d96219..71d5a6a 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -685,4 +686,26 @@ public class SegmentStatusManager {
       return listOfStreamSegments;
     }
   }
+
+  /**
+   * This function checks if any load or insert overwrite is in progress before dropping
that table
+   * @return
+   */
+  public static Boolean checkIfAnyLoadInProgressForTable(CarbonTable carbonTable) {
+    Boolean loadInProgress = false;
+    String metaPath = carbonTable.getMetaDataFilepath();
+    LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+              SegmentStatusManager.readLoadMetadata(metaPath);
+    if (listOfLoadFolderDetailsArray.length != 0) {
+      for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) {
+        SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
+        if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+                segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+          loadInProgress = true;
+        }
+      }
+    }
+    return loadInProgress;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
new file mode 100644
index 0000000..6af28c3
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.concurrent
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadTableConcurrentScenario extends QueryTest with BeforeAndAfterAll {
+
+  var carbonTable: CarbonTable = _
+  var metaPath: String = _
+
+  override def beforeAll {
+    sql("use default")
+    sql("drop table if exists drop_concur")
+    sql("drop table if exists rename_concur")
+  }
+
+  test("do not allow drop table when load is in progress") {
+    sql("create table drop_concur(id int, name string) stored by 'carbondata'")
+    sql("insert into drop_concur select 1,'abc'")
+    sql("insert into drop_concur select 1,'abc'")
+    sql("insert into drop_concur select 1,'abc'")
+
+    carbonTable = CarbonEnv.getCarbonTable(Option("default"), "drop_concur")(sqlContext.sparkSession)
+    metaPath = carbonTable.getMetaDataFilepath
+    val listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath)
+    listOfLoadFolderDetailsArray(1).setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS)
+
+    try {
+      sql("drop table drop_concur")
+    } catch {
+      case ex: Throwable => assert(ex.getMessage.contains("Cannot drop table, load or
insert overwrite is in progress"))
+    }
+  }
+
+  test("do not allow rename table when load is in progress") {
+    sql("create table rename_concur(id int, name string) stored by 'carbondata'")
+    sql("insert into rename_concur select 1,'abc'")
+    sql("insert into rename_concur select 1,'abc'")
+
+    carbonTable = CarbonEnv.getCarbonTable(Option("default"), "rename_concur")(sqlContext.sparkSession)
+    metaPath = carbonTable.getMetaDataFilepath
+    val listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath)
+    listOfLoadFolderDetailsArray(1).setSegmentStatus(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
+
+    try {
+      sql("alter table rename_concur rename to rename_concur1")
+    } catch {
+      case ex: Throwable => assert(ex.getMessage.contains("alter rename failed, load,
insert or insert overwrite " +
+        "is in progress for the table"))
+    }
+  }
+
+  override def afterAll: Unit = {
+    sql("use default")
+    sql("drop table if exists drop_concur")
+    sql("drop table if exists rename_concur")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f1b9ecd..6393289 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -453,7 +453,7 @@ object CarbonDataRDDFactory {
       return
     }
     if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-      // update the load entry in table status file for changing the status to failure
+      // update the load entry in table status file for changing the status to marked for
delete
       CommonUtil.updateTableStatusForFailure(carbonLoadModel)
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
@@ -468,7 +468,7 @@ object CarbonDataRDDFactory {
       if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
           status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
           carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-        // update the load entry in table status file for changing the status to failure
+        // update the load entry in table status file for changing the status to marked for
delete
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
@@ -480,7 +480,7 @@ object CarbonDataRDDFactory {
       // if segment is empty then fail the data load
       if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
           !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt))
{
-        // update the load entry in table status file for changing the status to failure
+        // update the load entry in table status file for changing the status to marked for
delete
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 63a625b..e761bea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -96,6 +96,7 @@ case class CarbonLoadDataCommand(
 
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val carbonLoadModel = new CarbonLoadModel()
     try {
       val table = if (tableInfoOp.isDefined) {
         CarbonTable.buildFromTableInfo(tableInfoOp.get)
@@ -120,7 +121,6 @@ case class CarbonLoadDataCommand(
           carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
             CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
-      val carbonLoadModel = new CarbonLoadModel()
       val factPath = if (dataFrame.isDefined) {
         ""
       } else {
@@ -139,7 +139,6 @@ case class CarbonLoadDataCommand(
         hadoopConf
       )
 
-
       try {
         val operationContext = new OperationContext
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
@@ -205,9 +204,13 @@ case class CarbonLoadDataCommand(
         OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
       } catch {
         case CausedBy(ex: NoRetryException) =>
+          // update the load entry in table status file for changing the status to marked
for delete
+          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
           throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         case ex: Exception =>
+          // update the load entry in table status file for changing the status to marked
for delete
+          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 8ebaea1..6bf55db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.schema
 
-import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent,
OperationContext, OperationListenerBus}
@@ -83,6 +84,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sparkSession)
       carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation].carbonTable
+      // if any load is in progress for table, do not allow rename table
+      if (SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)) {
+        throw new AnalysisException(s"Data loading is in progress for table $oldTableName,
alter " +
+                                    s"table rename operation is not allowed")
+      }
       // invalid data map for the old table, see CARBON-1690
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 98f103b..9901f8c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.util.CarbonException
@@ -29,8 +29,10 @@ import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
+import org.apache.carbondata.spark.util.CommonUtil
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,
@@ -71,6 +73,10 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
+      if (SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)) {
+        throw new AnalysisException(s"Data loading is in progress for table $tableName, drop
" +
+                                    s"table operation is not allowed")
+      }
       CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
 
       // fires the event after dropping main table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/99ec112f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index cda31c0..3922e76 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -296,7 +296,11 @@ public final class CarbonLoaderUtil {
           // existing entry needs to be overwritten as the entry will exist with some
           // intermediate status
           int indexToOverwriteNewMetaEntry = 0;
+          Boolean anyLoadInProgress = false;
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+            if (entry.getSegmentStatus().equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS))
{
+              anyLoadInProgress = true;
+            }
             if (entry.getLoadName().equals(newMetaEntry.getLoadName())
                 && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
               break;
@@ -304,7 +308,7 @@ public final class CarbonLoaderUtil {
             indexToOverwriteNewMetaEntry++;
           }
           if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getSegmentStatus()
==
-              SegmentStatus.MARKED_FOR_DELETE) {
+              SegmentStatus.MARKED_FOR_DELETE && anyLoadInProgress) {
             throw new RuntimeException("It seems insert overwrite has been issued during
load");
           }
           if (insertOverwrite) {


Mime
View raw message