carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1860][PARTITION] Support insertoverwrite for a specific partition
Date Thu, 21 Dec 2017 03:37:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master bc2e897cb -> 47aafabb3


[CARBONDATA-1860][PARTITION] Support insertoverwrite for a specific partition

This closes #1700


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/47aafabb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/47aafabb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/47aafabb

Branch: refs/heads/master
Commit: 47aafabb3bc2f64d828ed129746a5aea1bb5454c
Parents: bc2e897
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue Dec 19 13:19:15 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Dec 21 09:06:49 2017 +0530

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    | 33 ++-----
 .../StandardPartitionTableDropTestCase.scala    |  2 +-
 ...tandardPartitionTableOverwriteTestCase.scala | 99 ++++++++++++++++++++
 .../spark/rdd/CarbonDropPartitionRDD.scala      |  5 +-
 .../management/CarbonLoadDataCommand.scala      | 99 ++++++++++++++++++--
 .../CarbonStandardAlterTableDropPartition.scala | 24 +++--
 6 files changed, 215 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index f60b69f..b44f99b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -228,7 +228,7 @@ public class PartitionMapFileStore {
       }
       PartitionMapper mapper = new PartitionMapper();
       mapper.setPartitionMap(partitionMap);
-      String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT;
+      String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp";
       writePartitionFile(mapper, path);
     }
   }
@@ -241,33 +241,14 @@ public class PartitionMapFileStore {
    * @param success
    */
   public void commitPartitions(String segmentPath, final String uniqueId, boolean success)
{
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    CarbonFile carbonFile = FileFactory
+        .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT +
".tmp");
     // write partition info to new file.
     if (carbonFile.exists()) {
-      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
-        }
-      });
-      CarbonFile latestFile = null;
-      for (CarbonFile mapFile: carbonFiles) {
-        if (mapFile.getName().startsWith(uniqueId)) {
-          latestFile = mapFile;
-        }
-      }
-      if (latestFile != null) {
-        for (CarbonFile mapFile : carbonFiles) {
-          if (latestFile != mapFile) {
-            // Remove old files in case of success scenario
-            if (success) {
-              mapFile.delete();
-            }
-          }
-        }
-      }
-      // If it is failure scenario then remove the new file.
-      if (!success && latestFile != null) {
-        latestFile.delete();
+      if (success) {
+        carbonFile.renameForce(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT);
+      } else {
+        carbonFile.delete();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 9a9940b..2a25255 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -156,7 +156,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
   }
 
   override def afterAll = {
-    dropTable
+//    dropTable
   }
 
   def dropTable = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
new file mode 100644
index 0000000..945542a
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'=
',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  test("overwriting static partition table for date partition column on insert query") {
+    sql(
+      """
+        | CREATE TABLE staticpartitiondateinsert (empno int, empname String, designation
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date,doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29
00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary
from originTable where projectenddate=cast('2016-06-29' as Date)""")
+//    sql(s"""insert overwrite table partitiondateinsert  select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29'
as Date)"),
+      sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable where projectenddate=cast('2016-06-29' as Date)"))
+  }
+
+  test("overwriting partition table for date partition column on insert query") {
+    sql(
+      """
+        | CREATE TABLE partitiondateinsert (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (projectenddate Date,doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable""")
+    sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable where projectenddate=cast('2016-06-29' as Date)""")
+    checkAnswer(sql("select * from partitiondateinsert"),
+      sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
from originTable where projectenddate=cast('2016-06-29' as Date)"))
+  }
+
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists partitiondateinsert")
+    sql("drop table if exists staticpartitiondateinsert")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 09d9da1..d377c4d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -94,10 +94,11 @@ class CarbonDropPartitionRDD(
  * @param tablePath
  * @param segments segments to be merged
  */
-class CarbonDropPartitionRollbackRDD(
+class CarbonDropPartitionCommitRDD(
     sc: SparkContext,
     tablePath: String,
     segments: Seq[String],
+    success: Boolean,
     uniqueId: String)
   extends CarbonRDD[String](sc, Nil) {
 
@@ -112,7 +113,7 @@ class CarbonDropPartitionRollbackRDD(
       val split = theSplit.asInstanceOf[CarbonDropPartition]
       logInfo("Commit partition information from : " + split.segmentPath)
 
-      new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, false)
+      new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success)
 
       var havePair = false
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index a883735..69be362 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command.management
 
 import java.text.SimpleDateFormat
+import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -34,7 +35,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
@@ -46,6 +47,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
@@ -53,7 +55,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
@@ -67,7 +69,7 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD,
CarbonDropPartitionRDD, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
@@ -567,14 +569,18 @@ case class CarbonLoadDataCommand(
           carbonLoadModel,
           sparkSession)
     }
-    Dataset.ofRows(
-      sparkSession,
+    val convertedPlan =
       CarbonReflectionUtils.getInsertIntoCommand(
         convertRelation,
         partition,
         query,
-        isOverwriteTable,
-        false))
+        false,
+        false)
+    if (isOverwriteTable && partition.nonEmpty) {
+      overwritePartition(sparkSession, table, convertedPlan)
+    } else {
+      Dataset.ofRows(sparkSession, convertedPlan)
+    }
   }
 
   private def convertToLogicalRelation(
@@ -597,14 +603,18 @@ case class CarbonLoadDataCommand(
     }
     val partitionSchema =
       StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field
=>
-      metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
-
+        metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
+    val overWriteLocal = if (overWrite && partition.nonEmpty) {
+      false
+    } else {
+      overWrite
+    }
     val dataSchema =
       StructType(metastoreSchema
         .filterNot(field => partitionSchema.contains(field.name)))
     val options = new mutable.HashMap[String, String]()
     options ++= catalogTable.storage.properties
-    options += (("overwrite", overWrite.toString))
+    options += (("overwrite", overWriteLocal.toString))
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
@@ -621,6 +631,75 @@ case class CarbonLoadDataCommand(
       Some(catalogTable))
   }
 
+  /**
+   * Overwrite the partition data if static partitions are specified.
+   * @param sparkSession
+   * @param table
+   * @param logicalPlan
+   */
+  private def overwritePartition(
+      sparkSession: SparkSession,
+      table: CarbonTable,
+      logicalPlan: LogicalPlan): Unit = {
+    sparkSession.sessionState.catalog.listPartitions(
+      TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
+      Some(partition.map(f => (f._1, f._2.get))))
+    val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
+    val uniqueId = System.currentTimeMillis().toString
+    val segments = new SegmentStatusManager(
+      table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
+    try {
+      // First drop the partitions from partition mapper files of each segment
+      new CarbonDropPartitionRDD(
+        sparkSession.sparkContext,
+        table.getTablePath,
+        segments.asScala,
+        partitionNames.toSeq,
+        uniqueId).collect()
+    } catch {
+      case e: Exception =>
+        // roll back the drop partitions from carbon store
+        new CarbonDropPartitionCommitRDD(
+          sparkSession.sparkContext,
+          table.getTablePath,
+          segments.asScala,
+          false,
+          uniqueId).collect()
+        throw e
+    }
+
+    try {
+      Dataset.ofRows(sparkSession, logicalPlan)
+    } catch {
+      case e: Exception =>
+        // roll back the drop partitions from carbon store
+        new CarbonDropPartitionCommitRDD(
+          sparkSession.sparkContext,
+          table.getTablePath,
+          segments.asScala,
+          false,
+          uniqueId).collect()
+        throw e
+    }
+    // Commit the removed partitions in carbon store.
+    new CarbonDropPartitionCommitRDD(
+      sparkSession.sparkContext,
+      table.getTablePath,
+      segments.asScala,
+      true,
+      uniqueId).collect()
+    // Update the loadstatus with update time to clear cache from driver.
+    val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
+      .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
+    CarbonUpdateUtil.updateTableMetadataStatus(
+      segmentSet,
+      table,
+      uniqueId,
+      true,
+      new util.ArrayList[String])
+    DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+  }
+
   def getDataFrameWithTupleID(): DataFrame = {
     val fields = dataFrame.get.schema.fields
     import org.apache.spark.sql.functions.udf

http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
index b787aa7..fd59587 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.rdd.{CarbonDropPartitionRDD, CarbonDropPartitionRollbackRDD}
+import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
 
 /**
  * Drop the partitions from hive and carbon store. It drops the partitions in following steps
@@ -118,20 +118,28 @@ case class CarbonStandardAlterTableDropPartition(
       } catch {
         case e: Exception =>
           // roll back the drop partitions from carbon store
-          new CarbonDropPartitionRollbackRDD(sparkSession.sparkContext,
+          new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
             table.getTablePath,
             segments.asScala,
+            false,
             uniqueId).collect()
           throw e
       }
+      // commit the drop partitions from carbon store
+      new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
+        table.getTablePath,
+        segments.asScala,
+        true,
+        uniqueId).collect()
+      // Update the loadstatus with update time to clear cache from driver.
       val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
         .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
-      CarbonUpdateUtil
-        .updateTableMetadataStatus(segmentSet,
-          table,
-          uniqueId,
-          true,
-          new util.ArrayList[String])
+      CarbonUpdateUtil.updateTableMetadataStatus(
+        segmentSet,
+        table,
+        uniqueId,
+        true,
+        new util.ArrayList[String])
       DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
     } finally {
       AlterTableUtil.releaseLocks(locks)


Mime
View raw message