carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-2748] Blocking concurrent load if any column included as dictionary
Date Thu, 19 Jul 2018 16:07:18 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 892594743 -> bb09baca5


[CARBONDATA-2748] Blocking concurrent load if any column included as dictionary

This closes #2513


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bb09baca
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bb09baca
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bb09baca

Branch: refs/heads/master
Commit: bb09baca5da248fb7629af862de76d737d9acd90
Parents: 8925947
Author: rahul <rahul.kumar@knoldus.in>
Authored: Mon Jul 16 23:10:22 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Jul 19 21:34:30 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/locks/LockUsage.java |  2 +
 .../StandardPartitionTableLoadingTestCase.scala |  1 -
 .../management/CarbonLoadDataCommand.scala      | 46 +++++++++++++++++++-
 3 files changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index bd6b11d..b16c3f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -35,4 +35,6 @@ public class LockUsage {
   public static final String DROP_TABLE_LOCK = "droptable.lock";
   public static final String STREAMING_LOCK = "streaming.lock";
   public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock";
+  public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock";
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index b61583e..e3e8e68 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -305,7 +305,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
         |  utilization int,salary int)
         | PARTITIONED BY (workgroupcategory int, empname String, designation String)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname')
       """.stripMargin)
 
     val tasks = new util.ArrayList[Callable[String]]()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bb09baca/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 86a2bc1..3bf9595 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
+import java.util.{List, UUID}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -54,10 +54,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
 import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock,
LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
@@ -148,6 +149,7 @@ case class CarbonLoadDataCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    var concurrentLoadLock: Option[ICarbonLock] = None
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
     currPartitions = if (table.isHivePartitionTable) {
       CarbonFilters.getCurrentPartitions(
@@ -253,6 +255,7 @@ case class CarbonLoadDataCommand(
         }
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+        concurrentLoadLock = acquireConcurrentLoadLock()
         // Clean up the old invalid segment data before creating a new entry for new load.
         SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
         // add the start entry for the new load in the table status file
@@ -345,6 +348,7 @@ case class CarbonLoadDataCommand(
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex
       } finally {
+        releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
         // Once the data load is successful delete the unwanted partition files
         try {
           val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
@@ -375,6 +379,44 @@ case class CarbonLoadDataCommand(
     Seq.empty
   }
 
+  private def acquireConcurrentLoadLock(): Option[ICarbonLock] = {
+    val isConcurrentLockRequired = table.getAllDimensions.asScala
+      .exists(cd => cd.hasEncoding(Encoding.DICTIONARY) &&
+                    !cd.hasEncoding(Encoding.DIRECT_DICTIONARY))
+
+    if (isConcurrentLockRequired) {
+      var concurrentLoadLock: ICarbonLock = CarbonLockFactory.getCarbonLockObj(
+        table.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+        LockUsage.CONCURRENT_LOAD_LOCK)
+      val retryCount = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
+      val maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+          CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
+      if (!(isConcurrentLockRequired &&
+            concurrentLoadLock.lockWithRetries(retryCount, maxTimeout))) {
+        throw new RuntimeException(table.getDatabaseName + "." + table.getTableName +
+                                   " having dictionary column. so concurrent load is not
supported")
+      }
+      return Some(concurrentLoadLock)
+    }
+    return None
+  }
+
+  private def releaseConcurrentLoadLock(concurrentLoadLock: Option[ICarbonLock],
+      LOGGER: LogService): Unit = {
+    if (concurrentLoadLock.isDefined) {
+      if (concurrentLoadLock.get.unlock()) {
+        LOGGER.info("concurrent_load lock for table" + table.getTablePath +
+                    "has been released successfully")
+      } else {
+        LOGGER.error(
+          "Unable to unlock concurrent_load lock for table" + table.getTablePath);
+      }
+    }
+  }
+
   private def loadDataUsingOnePass(
       sparkSession: SparkSession,
       carbonProperty: CarbonProperties,


Mime
View raw message