carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [20/50] carbondata git commit: [CARBONDATA-2771]block update and delete on table if the compaction is in progress & local dictionary issue for old table
Date Mon, 30 Jul 2018 18:42:46 GMT
[CARBONDATA-2771]block update and delete on table if the compaction is in progress & local
dictionary issue for old table

Problem:
if update is triggered when comapction is in progress, in some cases there will be data miss
match and sometimes update fails as the segmentId mapping which is used in getPartitions may
differ, during update operation.
for older tables describe formatted was showing true for local dictionary enable

Solution
Block update and delete if compaction is in progress for table
for older tables, local dictionary property will be null, so describe formatted should show
false for local dictionary enable

This closes #2541


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/00769570
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/00769570
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/00769570

Branch: refs/heads/branch-1.4
Commit: 007695703def706e3846671d7745001431507277
Parents: b6f1258
Author: akashrn5 <akashnilugal@gmail.com>
Authored: Mon Jul 23 20:29:46 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 20 +++++++
 .../createTable/TestCreateExternalTable.scala   | 32 ++++++-----
 .../TestNonTransactionalCarbonTable.scala       | 12 +++++
 ...ransactionalCarbonTableWithComplexType.scala |  3 ++
 .../CarbonProjectForDeleteCommand.scala         |  4 ++
 .../CarbonProjectForUpdateCommand.scala         |  3 ++
 .../table/CarbonDescribeFormattedCommand.scala  | 57 ++++++++++----------
 .../org/apache/spark/util/AlterTableUtil.scala  | 12 ++++-
 8 files changed, 101 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 5c73259..d5b456c 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -770,6 +770,26 @@ public class SegmentStatusManager {
   }
 
   /**
+   * Return true if the compaction is in progress for the table
+   * @param carbonTable
+   * @return
+   */
+  public static Boolean isCompactionInProgress(CarbonTable carbonTable) {
+    if (carbonTable == null) {
+      return false;
+    }
+    boolean compactionInProgress;
+    ICarbonLock lock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier(), LockUsage.COMPACTION_LOCK);
+    try {
+      compactionInProgress = !lock.lockWithRetries(1, 0);
+    } finally {
+      lock.unlock();
+    }
+    return compactionInProgress;
+  }
+
+  /**
    * Return true if insert overwrite is in progress for specified table
    */
   public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
index 519089b..a9b8d57 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateExternalTable.scala
@@ -46,9 +46,23 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll
{
   test("create external table with existing files") {
     assert(new File(originDataPath).exists())
     sql("DROP TABLE IF EXISTS source")
-    if (CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-        CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("false"))
{
+    if (System
+          .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+            CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true")
||
+        CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+            CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT).equalsIgnoreCase("true"))
{
+
+      intercept[Exception] {
+        // create external table with existing files
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE source
+             |STORED BY 'carbondata'
+             |LOCATION '$storeLocation/origin'
+       """.stripMargin)
+      }
+    } else {
 
       // create external table with existing files
       sql(
@@ -68,17 +82,7 @@ class TestCreateExternalTable extends QueryTest with BeforeAndAfterAll
{
 
       // DROP TABLE should not delete data
       assert(new File(originDataPath).exists())
-    }
-    else {
-      intercept[Exception] {
-        // create external table with existing files
-        sql(
-          s"""
-             |CREATE EXTERNAL TABLE source
-             |STORED BY 'carbondata'
-             |LOCATION '$storeLocation/origin'
-       """.stripMargin)
-      }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index c7d9caa..62c3df6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2329,12 +2329,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll
{
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2355,12 +2358,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll
{
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2381,12 +2387,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll
{
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
     FileUtils.deleteDirectory(new File(writerPath))
   }
@@ -2411,12 +2420,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll
{
     val descLoc = sql("describe formatted sdkTable").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
       case Some(row) => assert(row.get(1).toString.contains("name,surname"))
+      case None => assert(false)
     }
 
     checkAnswer(sql("select count(*) from sdkTable"), Seq(Row(1)))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 8a27d0d..7593f38 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -219,12 +219,15 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest
with Befo
     val descLoc = sql("describe formatted localComplex").collect
     descLoc.find(_.get(0).toString.contains("Local Dictionary Enabled")) match {
       case Some(row) => assert(row.get(1).toString.contains("true"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Threshold")) match {
       case Some(row) => assert(row.get(1).toString.contains("10000"))
+      case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
       case Some(row) => assert(row.get(1).toString.contains("name,val1.val2.street,val1.val2.city,val1.val2.WindSpeed,val1.val2.year"))
+      case None => assert(false)
     }
 
     // TODO: Add a validation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f1fa9b3..0127d7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -50,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional
table")
     }
 
+    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "compaction", "data delete")
+    }
+
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 964407f..4e9c1af 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -70,6 +70,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional
table")
     }
+    if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "compaction", "data update")
+    }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 9e1e0e4..41dfea5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -123,41 +123,44 @@ private[sql] case class CarbonDescribeFormattedCommand(
     }
 
     var isLocalDictEnabled = tblProps.asScala
-      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
-        CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
-    val localDictEnabled = isLocalDictEnabled.split(",") { 0 }
-    results ++= Seq(("Local Dictionary Enabled", localDictEnabled, ""))
-    // if local dictionary is enabled, then only show other properties of local dictionary
-    if (localDictEnabled.toBoolean) {
-      var localDictThreshold = tblProps.asScala
-        .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
-          CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
-      val localDictionaryThreshold = localDictThreshold.split(",")
-      localDictThreshold = localDictionaryThreshold { 0 }
-      results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
-      val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
-      val builder = new StringBuilder
-      columns.foreach { column =>
-        if (column.isLocalDictColumn && !column.isInvisible) {
-          builder.append(column.getColumnName).append(",")
-        }
-      }
-      results ++=
-      Seq(("Local Dictionary Include", getDictColumnString(builder.toString().split(",")),
""))
-      if (tblProps.asScala
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+      .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
+    if (isLocalDictEnabled.isDefined) {
+      val localDictEnabled = isLocalDictEnabled.get.split(",") { 0 }
+      results ++= Seq(("Local Dictionary Enabled", localDictEnabled, ""))
+      // if local dictionary is enabled, then only show other properties of local dictionary
+      if (localDictEnabled.toBoolean) {
+        var localDictThreshold = tblProps.asScala
+          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)
+        val localDictionaryThreshold = localDictThreshold.split(",")
+        localDictThreshold = localDictionaryThreshold { 0 }
+        results ++= Seq(("Local Dictionary Threshold", localDictThreshold, ""))
         val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
         val builder = new StringBuilder
         columns.foreach { column =>
-          if (!column.isLocalDictColumn && !column.isInvisible &&
-              (column.getDataType.equals(DataTypes.STRING) ||
-               column.getDataType.equals(DataTypes.VARCHAR))) {
+          if (column.isLocalDictColumn && !column.isInvisible) {
             builder.append(column.getColumnName).append(",")
           }
         }
         results ++=
-        Seq(("Local Dictionary Exclude", getDictColumnString(builder.toString().split(",")),
""))
+        Seq(("Local Dictionary Include", getDictColumnString(builder.toString().split(",")),
""))
+        if (tblProps.asScala
+          .get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
+          val columns = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+          val builder = new StringBuilder
+          columns.foreach { column =>
+            if (!column.isLocalDictColumn && !column.isInvisible &&
+                (column.getDataType.equals(DataTypes.STRING) ||
+                 column.getDataType.equals(DataTypes.VARCHAR))) {
+              builder.append(column.getColumnName).append(",")
+            }
+          }
+          results ++=
+          Seq(("Local Dictionary Exclude", getDictColumnString(builder.toString().split(",")),
""))
+        }
       }
+    } else {
+      results ++= Seq(("Local Dictionary Enabled", "false", ""))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/00769570/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 96b191f..cab9de5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -346,7 +346,17 @@ object AlterTableUtil {
         // since thriftTable also holds comment as its property.
         propKeys.foreach { propKey =>
           if (validateTableProperties(propKey)) {
-            tblPropertiesMap.remove(propKey.toLowerCase)
+            // This check is required because for old tables we need to keep same behavior
for it,
+            // meaning, local dictionary should be disabled. To enable we can use set command
for
+            // older tables. So no need to remove from table properties map for unset just
to ensure
+            // for older table behavior. So in case of unset, if enable property is already
present
+            // in map, then just set it to default value of local dictionary which is true.
+            if (!propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE))
{
+              tblPropertiesMap.remove(propKey.toLowerCase)
+            } else {
+              tblPropertiesMap
+                .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
+            }
           } else {
             val errorMessage = "Error: Invalid option(s): " + propKey
             throw new MalformedCarbonCommandException(errorMessage)


Mime
View raw message