carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [43/50] [abbrv] carbondata git commit: [CARBONDATA-2048] Data delete should be rejected when insert overwrite is in progress
Date Sun, 28 Jan 2018 06:46:12 GMT
[CARBONDATA-2048] Data delete should be rejected when insert overwrite is in progress

Add testcase and fix bug: data delete should be rejected when insert overwrite is in progress

This closes #1826


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f6d96405
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f6d96405
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f6d96405

Branch: refs/heads/fgdatamap
Commit: f6d964050e5f27f60b6483e32366dd3ebfebe2e2
Parents: aba5416
Author: Jacky Li <jacky.likun@qq.com>
Authored: Thu Jan 18 12:03:56 2018 +0800
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Thu Jan 18 17:31:35 2018 +0800

----------------------------------------------------------------------
 .../spark/testsuite/iud/IUDConcurrentTest.scala | 186 -----------------
 .../iud/InsertOverwriteConcurrentTest.scala     | 204 +++++++++++++++++++
 .../CarbonProjectForDeleteCommand.scala         |  18 +-
 3 files changed, 218 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f6d96405/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
deleted file mode 100644
index bb1d26f..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.iud
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
-
-class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
-  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
-  var df: DataFrame = _
-
-  override def beforeAll {
-    dropTable()
-    buildTestData()
-
-    // register hook to the table to sleep, thus the other command will be executed
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
-      classOf[WaitingDataMap].getName,
-      "test")
-  }
-
-  private def buildTestData(): Unit = {
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
-
-    // Simulate data and write to table orders
-    import sqlContext.implicits._
-
-    val sdf = new SimpleDateFormat("yyyy-MM-dd")
-    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
-      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
-        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
-        "ordersTable" + value))
-      .toDF("o_id", "o_date", "o_country", "o_name",
-        "o_phonetype", "o_serialname", "o_salary", "o_comment")
-    createTable("orders")
-    createTable("orders_overwrite")
-  }
-
-  private def createTable(tableName: String): Unit = {
-    df.write
-      .format("carbondata")
-      .option("tableName", tableName)
-      .option("tempCSV", "false")
-      .mode(SaveMode.Overwrite)
-      .save()
-  }
-
-  override def afterAll {
-    executorService.shutdownNow()
-    dropTable()
-  }
-
-  override def beforeEach(): Unit = {
-    Global.overwriteRunning = false
-  }
-
-  private def dropTable() = {
-    sql("DROP TABLE IF EXISTS orders")
-    sql("DROP TABLE IF EXISTS orders_overwrite")
-  }
-
-  // run the input SQL and block until it is running
-  private def runSqlAsync(sql: String): Future[String] = {
-    assert(!Global.overwriteRunning)
-    var count = 0
-    val future = executorService.submit(
-      new QueryTask(sql)
-    )
-    while (!Global.overwriteRunning && count < 1000) {
-      Thread.sleep(10)
-      // to avoid dead loop in case WaitingDataMap is not invoked
-      count += 1
-    }
-    future
-  }
-
-  test("compaction should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception]{
-      sql("alter table orders compact 'MINOR'")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Cannot run data loading and compaction on same table concurrently"))
-  }
-
-  test("update should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception] {
-      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Cannot run data loading and update on same table concurrently"))
-  }
-
-  class QueryTask(query: String) extends Callable[String] {
-    override def call(): String = {
-      var result = "PASS"
-      try {
-        sql(query).collect()
-      } catch {
-        case exception: Exception => LOGGER.error(exception.getMessage)
-          result = "FAIL"
-      }
-      result
-    }
-  }
-
-}
-
-object Global {
-  var overwriteRunning = false
-}
-
-class WaitingDataMap() extends DataMapFactory {
-
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
-
-  override def fireEvent(event: Event): Unit = ???
-
-  override def clear(segmentId: String): Unit = {}
-
-  override def clear(): Unit = {}
-
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap]
= ???
-
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
-
-  override def createWriter(segmentId: String): DataMapWriter = {
-    new DataMapWriter {
-      override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit
= { }
-
-      override def onBlockletEnd(blockletId: Int): Unit = { }
-
-      override def onBlockEnd(blockId: String): Unit = { }
-
-      override def onBlockletStart(blockletId: Int): Unit = { }
-
-      override def onBlockStart(blockId: String): Unit = {
-        // trigger the second SQL to execute
-        Global.overwriteRunning = true
-
-        // wait for 1 second to let second SQL to finish
-        Thread.sleep(1000)
-      }
-    }
-  }
-
-  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
-
-  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f6d96405/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
new file mode 100644
index 0000000..25bdf7b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
+
+class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
+  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
+  var df: DataFrame = _
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+
+    // register hook to the table to sleep, thus the other command will be executed
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
+      classOf[WaitingDataMap].getName,
+      "test")
+  }
+
+  private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
+        "ordersTable" + value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary", "o_comment")
+    createTable("orders")
+    createTable("orders_overwrite")
+  }
+
+  private def createTable(tableName: String): Unit = {
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  override def afterAll {
+    executorService.shutdownNow()
+    dropTable()
+  }
+
+  override def beforeEach(): Unit = {
+    Global.overwriteRunning = false
+  }
+
+  private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+    sql("DROP TABLE IF EXISTS orders_overwrite")
+  }
+
+  // run the input SQL and block until it is running
+  private def runSqlAsync(sql: String): Future[String] = {
+    assert(!Global.overwriteRunning)
+    var count = 0
+    val future = executorService.submit(
+      new QueryTask(sql)
+    )
+    while (!Global.overwriteRunning && count < 1000) {
+      Thread.sleep(10)
+      // to avoid dead loop in case WaitingDataMap is not invoked
+      count += 1
+    }
+    future
+  }
+
+  test("compaction should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception]{
+      sql("alter table orders compact 'MINOR'")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Cannot run data loading and compaction on same table concurrently"))
+  }
+
+  test("update should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception] {
+      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Cannot run data loading and update on same table concurrently"))
+  }
+
+  test("delete should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception] {
+      sql("delete from orders where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Cannot run data loading and delete on same table concurrently"))
+  }
+
+  test("drop table should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
+    val ex = intercept[Exception] {
+      sql("drop table if exists orders")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains("Data loading is in progress for table orders, drop table
operation is not allowed"))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        sql(query).collect()
+      } catch {
+        case exception: Exception => LOGGER.error(exception.getMessage)
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+}
+
+object Global {
+  var overwriteRunning = false
+}
+
+class WaitingDataMap() extends DataMapFactory {
+
+  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segmentId: String): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap]
= ???
+
+  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+  override def createWriter(segmentId: String): DataMapWriter = {
+    new DataMapWriter {
+      override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit
= { }
+
+      override def onBlockletEnd(blockletId: Int): Unit = { }
+
+      override def onBlockEnd(blockId: String): Unit = { }
+
+      override def onBlockletStart(blockletId: Int): Unit = { }
+
+      override def onBlockStart(blockId: String): Unit = {
+        // trigger the second SQL to execute
+        Global.overwriteRunning = true
+
+        // wait for 1 second to let second SQL to finish
+        Thread.sleep(1000)
+      }
+    }
+  }
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
+
+  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f6d96405/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 7022566..874d416 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -17,15 +17,18 @@
 
 package org.apache.spark.sql.execution.command.mutation
 
-import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext,
OperationListenerBus}
 import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 /**
  * IUD update delete and compaction framework.
@@ -40,12 +43,19 @@ private[sql] case class CarbonProjectForDeleteCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-      IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)
+    if (isLoadInProgress) {
+      val errorMessage = "Cannot run data loading and delete on same table concurrently.
Please " +
+                         "wait for load to finish"
+      LOGGER.error(errorMessage)
+      throw new ConcurrentOperationException(errorMessage)
+    }
+
+    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val dataFrame = Dataset.ofRows(sparkSession, plan)
     val dataRdd = dataFrame.rdd
 
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
     // trigger event for Delete from table
     val operationContext = new OperationContext
     val deleteFromTablePreEvent: DeleteFromTablePreEvent =


Mime
View raw message