carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1909] Load is failing during insert into operation when load is concurrently done to source table
Date Mon, 29 Jan 2018 10:03:18 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master a597c2f9b -> ab763474f


[CARBONDATA-1909] Load is failing during insert into operation when load is concurrently done
to source table

This closes #1693


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ab763474
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ab763474
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ab763474

Branch: refs/heads/master
Commit: ab763474f9a8191d84ea742a8b6ee615d310999a
Parents: a597c2f
Author: Manohar <manohar.crazy09@gmail.com>
Authored: Wed Dec 20 15:09:45 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Mon Jan 29 15:30:53 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 18 +++++++++++
 .../management/CarbonInsertIntoCommand.scala    | 34 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab763474/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 13c8a42..f46feef 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -865,6 +865,24 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
 
   /**
+   * If we are executing insert into query from source table using select statement
+   * & loading the same source table concurrently, when select happens on source table
+   * during the data load , it gets new record for which dictionary is not generated,
+   * So there will be inconsistency. To avoid this condition we can persist the dataframe
+   * into MEMORY_AND_DISK and perform insert into operation. By default this value
+   * will be false because no need to persist the dataframe in all cases. If user want
+   * to run load and insert queries on source table concurrently then user can enable this
flag
+   */
+  @CarbonProperty
+  public static final String CARBON_INSERT_PERSIST_ENABLED = "carbon.insert.persist.enable";
+
+  /**
+   * by default rdd will not be persisted in the insert case.
+
+   */
+  public static final String CARBON_INSERT_PERSIST_ENABLED_DEFAULT = "false";
+
+  /**
    * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ab763474/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 626cdba..86d6759 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -18,9 +18,14 @@
 package org.apache.spark.sql.execution.command.management
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand}
+import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class CarbonInsertIntoCommand(
@@ -33,7 +38,26 @@ case class CarbonInsertIntoCommand(
   var loadCommand: CarbonLoadDataCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val df = Dataset.ofRows(sparkSession, child)
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    def containsLimit(plan: LogicalPlan): Boolean = {
+      plan find {
+        case limit: GlobalLimit => true
+        case other => false
+      } isDefined
+    }
+    val isPersistEnabledUserValue = CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED,
+        CarbonCommonConstants.CARBON_INSERT_PERSIST_ENABLED_DEFAULT)
+    val isPersistRequired =
+      isPersistEnabledUserValue.equalsIgnoreCase("true") || containsLimit(child)
+    val df =
+      if (isPersistRequired) {
+        LOGGER.audit("Persist enabled for Insert operation")
+        Dataset.ofRows(sparkSession, child)
+          .persist(StorageLevel.MEMORY_AND_DISK)
+      } else {
+        Dataset.ofRows(sparkSession, child)
+      }
     val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
     loadCommand = CarbonLoadDataCommand(
       databaseNameOp = Some(relation.carbonRelation.databaseName),
@@ -48,7 +72,11 @@ case class CarbonInsertIntoCommand(
       tableInfoOp = None,
       internalOptions = Map.empty,
       partition = partition)
-    loadCommand.processMetadata(sparkSession)
+    val load = loadCommand.processMetadata(sparkSession)
+    if (isPersistRequired) {
+      df.unpersist()
+    }
+    load
   }
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (null != loadCommand) {


Mime
View raw message