carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3902] Fix CDC delete data issue on partition table
Date Wed, 22 Jul 2020 04:40:21 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 178aacf  [CARBONDATA-3902] Fix CDC delete data issue on partition table
178aacf is described below

commit 178aacfbb06842c7fe3213c72855e570d1199f8c
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Wed Jul 15 17:01:47 2020 +0530

    [CARBONDATA-3902] Fix CDC delete data issue on partition table
    
    Why is this PR needed?
    When only delete operation is executed through cdc merge command on partition table,
    loadMetaEntry of segments which has deleted rows, is not updated with latest
    tableupdatestatus filename. On query, will give wrong results.
    
    What changes were proposed in this PR?
    Update loadMetaEntry of segments which has deleted rows, with latest tableupdatestatus
    filename, after delete operation on partition table, if loadDF.count == 0
    
    NOTE: If merge has data to be inserted after(U & D), CarbonInsertIntoDf will update
    the tableupdatestatus file name in load meta entry, during load.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3846
---
 .../mutation/merge/CarbonMergeDataSetCommand.scala | 20 ++++++-
 .../spark/testsuite/merge/MergeTestCase.scala      | 67 ++++++++++++++++++----
 2 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index 7abfc42..fb5b200 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -46,9 +46,12 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata,
LongAccum
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.FailureCauses
 
 /**
@@ -164,7 +167,7 @@ case class CarbonMergeDataSetCommand(
         processedRDD)(sparkSession))
 
     loadDF.cache()
-    loadDF.count()
+    val count = loadDF.count()
     val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
       val deltaRdd = sparkSession.read.format("carbon").load(deltaPath).rdd
       val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
@@ -175,6 +178,21 @@ case class CarbonMergeDataSetCommand(
         LOGGER.error("writing of update status file failed")
         throw new CarbonMergeDataSetException("writing of update status file failed")
       }
+      if (carbonTable.isHivePartitionTable) {
+        // If load count is 0 and if merge action contains delete operation, update
+        // tableUpdateStatus file name in loadMeta entry
+        if (count == 0 && hasDelAction && !tuple._1.isEmpty) {
+          val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
+            .getTableStatusFilePath(carbonTable.getTablePath))
+          CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail
=>
+            new Segment(loadMetadataDetail.getMergedLoadName,
+              loadMetadataDetail.getSegmentFile)).toSet.asJava,
+            carbonTable,
+            trxMgr.getLatestTrx.toString,
+            true,
+            tuple._2.asJava)
+        }
+      }
       Some(UpdateTableModel(true, trxMgr.getLatestTrx,
         executorErrors, tuple._2, true))
     } else {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index d068507..8c4725a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.spark.testsuite.merge
 
-import java.io.File
 import java.sql.Date
 import java.time.LocalDateTime
 
@@ -626,7 +625,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
   }
 
-  test("check the ccd with partition") {
+  test("check the cdc with partition") {
     sql("drop table if exists target")
 
     val initframe = sqlContext.sparkSession.createDataFrame(Seq(
@@ -643,7 +642,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
     val target = sqlContext.read.format("carbondata").option("tableName", "target").load()
-    var ccd =
+    var cdc =
       sqlContext.sparkSession.createDataFrame(Seq(
         Row("a", "10", false,  0),
         Row("a", null, true, 1),   // a was updated and then deleted
@@ -656,15 +655,15 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
         StructType(Seq(StructField("key", StringType),
           StructField("newValue", StringType),
           StructField("deleted", BooleanType), StructField("time", IntegerType))))
-    ccd.createOrReplaceTempView("changes")
+    cdc.createOrReplaceTempView("changes")
 
-    ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (
SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
+    cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (
SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
 
     val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any,
Any]]
 
     val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any,
Any]]
 
-    target.as("A").merge(ccd.as("B"), "A.key=B.key").
+    target.as("A").merge(cdc.as("B"), "A.key=B.key").
       whenMatched("B.deleted=false").
       updateExpr(updateMap).
       whenNotMatched("B.deleted=false").
@@ -676,7 +675,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"),
Row("e", "100")))
   }
 
-  test("check the ccd ") {
+  test("check the cdc ") {
     sql("drop table if exists target")
 
     val initframe = sqlContext.sparkSession.createDataFrame(Seq(
@@ -692,7 +691,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
     val target = sqlContext.read.format("carbondata").option("tableName", "target").load()
-    var ccd =
+    var cdc =
       sqlContext.sparkSession.createDataFrame(Seq(
         Row("a", "10", false,  0),
         Row("a", null, true, 1),   // a was updated and then deleted
@@ -705,15 +704,15 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
         StructType(Seq(StructField("key", StringType),
           StructField("newValue", StringType),
           StructField("deleted", BooleanType), StructField("time", IntegerType))))
-    ccd.createOrReplaceTempView("changes")
+    cdc.createOrReplaceTempView("changes")
 
-    ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (
SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
+    cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (
SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
 
     val updateMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any,
Any]]
 
     val insertMap = Map("key" -> "B.key", "value" -> "B.newValue").asInstanceOf[Map[Any,
Any]]
 
-    target.as("A").merge(ccd.as("B"), "A.key=B.key").
+    target.as("A").merge(cdc.as("B"), "A.key=B.key").
       whenMatched("B.deleted=false").
       updateExpr(updateMap).
       whenNotMatched("B.deleted=false").
@@ -725,6 +724,47 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"),
Row("e", "100")))
   }
 
+  test("check the cdc delete with partition") {
+    sql("drop table if exists target")
+
+    val initframe = sqlContext.sparkSession.createDataFrame(Seq(
+      Row("a", "0"),
+      Row("a1", "0"),
+      Row("b", "1"),
+      Row("c", "2"),
+      Row("d", "3")
+    ).asJava, StructType(Seq(StructField("key", StringType), StructField("value", StringType))))
+
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "target")
+      .option("partitionColumns", "value")
+      .mode(SaveMode.Overwrite)
+      .save()
+    val target = sqlContext.read.format("carbondata").option("tableName", "target").load()
+    var cdc =
+      sqlContext.sparkSession.createDataFrame(Seq(
+        Row("a", null, true, 1),
+        Row("a1", null, false, 1),
+        Row("b", null, true, 2),
+        Row("c", null, true, 3),
+        Row("e", "100", false, 6)
+      ).asJava,
+        StructType(Seq(StructField("key", StringType),
+          StructField("newValue", StringType),
+          StructField("deleted", BooleanType), StructField("time", IntegerType))))
+    cdc.createOrReplaceTempView("changes")
+
+    cdc = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (
SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key)")
+
+    target.as("A").merge(cdc.as("B"), "A.key=B.key").
+      whenMatched("B.deleted=true").delete().execute()
+
+    assert(getDeleteDeltaFileCount("target", "0") == 1)
+    checkAnswer(sql("select count(*) from target"), Seq(Row(2)))
+    checkAnswer(sql("select * from target order by key"), Seq(Row("a1","0"),Row("d", "3")))
+  }
+
   case class Target (id: Int, value: String, remark: String, mdt: String)
   case class Change (id: Int, value: String, change_type: String, mdt: String)
   private val numInitialRows = 10
@@ -871,8 +911,11 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
   private def getDeleteDeltaFileCount(tableName: String, segment: String): Int = {
     val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
-    val path = CarbonTablePath
+    var path = CarbonTablePath
       .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+    if (table.isHivePartitionTable) {
+      path = table.getAbsoluteTableIdentifier.getTablePath
+    }
     val deleteDeltaFiles = FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter
{
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonCommonConstants
         .DELETE_DELTA_FILE_EXT)


Mime
View raw message