carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [10/50] [abbrv] carbondata git commit: [CARBONDATA-1755] Fixed bug occuring on concurrent insert-overwrite and update
Date Sun, 28 Jan 2018 06:45:39 GMT
[CARBONDATA-1755] Fixed bug occuring on concurrent insert-overwrite and update

Description: Concurrent Insert overwrite-update: User is able to run insert overwrite and
update job concurrently.
Updated data will be overwritten by insert overwrite job. So there is no meaning of running
update job if insert overwrite is in progress. So the two operations should not be allowed
to run at the same time.

This closes #1722


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62ce5b5a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62ce5b5a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62ce5b5a

Branch: refs/heads/fgdatamap
Commit: 62ce5b5abaffce13d3e2a169e80c361a126ab65f
Parents: d33d347
Author: SangeetaGulia <sangeeta.gulia@knoldus.in>
Authored: Fri Dec 22 16:38:10 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Jan 9 21:46:12 2018 +0800

----------------------------------------------------------------------
 .../iud/TestInsertUpdateConcurrentTest.scala    | 101 +++++++++++++++++++
 .../CarbonProjectForUpdateCommand.scala         |   9 ++
 2 files changed, 110 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/62ce5b5a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala
new file mode 100644
index 0000000..613b2a6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertUpdateConcurrentTest.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestInsertUpdateConcurrentTest extends QueryTest with BeforeAndAfterAll {
+  var df: DataFrame = _
+  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+  }
+
+  override def afterAll {
+    executorService.shutdownNow()
+    dropTable()
+  }
+
+
+  private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary","o_comment")
+      createTable("orders")
+      createTable("orders_overwrite")
+  }
+
+ private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+    sql("DROP TABLE IF EXISTS orders_overwrite")
+  }
+
+  private def createTable(tableName: String): Unit ={
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  test("Concurrency test for Insert-Overwrite and update") {
+    val tasks = new java.util.ArrayList[Callable[String]]()
+    tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite"))
+    tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'"))
+    val results: util.List[Future[String]] = executorService.invokeAll(tasks)
+    assert("PASS".equals(results.get(0).get) && "FAIL".equals(results.get(1).get))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        LOGGER.info("Executing :" + query + Thread.currentThread().getName)
+        sql(query).show()
+      } catch {
+        case _: Exception =>
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62ce5b5a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index bd53a66..20a6bab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -28,9 +28,11 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent,
UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,
@@ -52,6 +54,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)
+    if (isLoadInProgress) {
+      val errorMessage = "Cannot run data loading and update on same table concurrently.
Please " +
+                         "wait for load to finish"
+      LOGGER.error(errorMessage)
+      throw new ConcurrentOperationException(errorMessage)
+    }
 
     // trigger event for Update table
     val operationContext = new OperationContext


Mime
View raw message