carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [40/50] [abbrv] carbondata git commit: [CARBONDATA-1972][PARTITION] Fix compaction after update of partition table.
Date Tue, 09 Jan 2018 04:02:08 GMT
[CARBONDATA-1972][PARTITION] Fix compaction after update of partition table.

When updation happens on whole data then all old segments needs to be marked as delete. But
it is not happening in case of partition table. This PR fixes it.

This closes #1752


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cdf2c029
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cdf2c029
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cdf2c029

Branch: refs/heads/branch-1.3
Commit: cdf2c0297f441b1f6070f076d3b67c9acaf86da8
Parents: 3c8031b
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Jan 3 09:16:36 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Jan 5 19:36:26 2018 +0800

----------------------------------------------------------------------
 .../hadoop/api/CarbonOutputCommitter.java       | 15 ++++++++----
 .../hadoop/api/CarbonTableOutputFormat.java     |  8 +++++++
 ...andardPartitionTableCompactionTestCase.scala | 15 +++++++++++-
 .../command/carbonTableSchemaCommon.scala       |  3 ++-
 .../management/CarbonLoadDataCommand.scala      |  3 +++
 .../CarbonProjectForDeleteCommand.scala         | 25 ++++++++++----------
 .../CarbonProjectForUpdateCommand.scala         | 10 ++++----
 .../command/mutation/DeleteExecution.scala      | 17 +++++++------
 .../datasources/CarbonFileFormat.scala          | 14 +++++++----
 9 files changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 97d5a7f..d0a5fd9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,9 +18,7 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -100,12 +98,19 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       mergeCarbonIndexFiles(segmentPath);
       String updateTime =
           context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
+      String segmentsToBeDeleted =
+          context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED,
"");
+      List<String> segmentDeleteList = Arrays.asList(segmentsToBeDeleted.split(","));
       if (updateTime != null) {
         Set<String> segmentSet = new HashSet<>(
             new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
                 .getValidAndInvalidSegments().getValidSegments());
-        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
-            new ArrayList<String>());
+        CarbonUpdateUtil.updateTableMetadataStatus(
+            segmentSet,
+            carbonTable,
+            updateTime,
+            true,
+            segmentDeleteList);
       }
     } else {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index bd70e41..897c929 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -95,6 +95,14 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Stri
    */
   public static final String UPADTE_TIMESTAMP = "mapreduce.carbontable.update.timestamp";
 
+  /**
+   * During update query we first delete the old data and then add updated data to new segment,
so
+   * sometimes there is a chance that complete segments needs to removed during deletion.
We should
+   * do 'Mark for delete' for those segments during table status update.
+   */
+  public static final String SEGMENTS_TO_BE_DELETED =
+      "mapreduce.carbontable.segments.to.be.removed";
+
   private static final Log LOG = LogFactory.getLog(CarbonTableOutputFormat.class);
 
   private CarbonOutputCommitter committer;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 3e6cd26..298b793 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -193,7 +193,19 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with
BeforeAndA
       .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
   }
 
-  override def afterAll = {
+  test("Compaction after update in partition table") {
+    sql("create table compactionupdatepartition (imei string,deviceInformationId int,MAC
string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize
string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode
string,internalModels string, deliveryTime string, channelsId string, channelsName string
, deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict
string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId
string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string,
ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion
string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string,
Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string,
Active_operatorsVersion string, Active_phonePADPa
 rtitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR
string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string,
Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion
string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string,
Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId
string, gamePointDescription string,gamePointId double) partitioned by(contractNumber BigInt)
STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE compactionupdatepartition
options ('DELIMITER'=',', 'QUOTECHAR'='', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE compactionupdatepartition
options ('DELIMITER'=',', 'QUOTECHAR'='', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE compactionupdatepartition
options ('DELIMITER'=',', 'QUOTECHAR'='', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE compactionupdatepartition
options ('DELIMITER'=',', 'QUOTECHAR'='', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""insert into compactionupdatepartition select * from compactionupdatepartition""")
+    sql("update compactionupdatepartition set(AMSize)=('8RAM size')").show()
+    sql("delete from compactionupdatepartition where AMSize ='8RAM size'").show()
+    sql(s"""alter table compactionupdatepartition compact 'major'""").collect
+  }
+
+    override def afterAll = {
     dropTable
   }
 
@@ -206,6 +218,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with
BeforeAndA
     sql("drop table if exists partitionmajor")
     sql("drop table if exists staticpartition")
     sql("drop table if exists staticpartitioncompaction")
+    sql("drop table if exists compactionupdatepartition")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index b76cfcf..37eea60 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -132,7 +132,8 @@ case class AlterTableModel(
 case class UpdateTableModel(
     isUpdate: Boolean,
     updatedTimeStamp: Long,
-    var executorErrors: ExecutionErrors)
+    var executorErrors: ExecutionErrors,
+    deletedSegments: Seq[String])
 
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4c1e748..01bb5b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -711,6 +711,9 @@ case class CarbonLoadDataCommand(
     options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
+      if (updateModel.get.deletedSegments.nonEmpty) {
+        options += (("segmentsToBeDeleted", updateModel.get.deletedSegments.mkString(",")))
+      }
     }
     val hdfsRelation = HadoopFsRelation(
       location = catalog,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index e54ea9e..7022566 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -70,28 +70,27 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution.deleteDeltaExecution(
+      DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
         dataRdd,
         timestamp,
         isUpdateOperation = false,
-        executorErrors)) {
-        // call IUD Compaction.
-        HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
-          isUpdateOperation = false)
+        executorErrors)
+      // call IUD Compaction.
+      HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
+        isUpdateOperation = false)
 
 
-        if (executorErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executorErrors.errorMsg)
-        }
-
-        // trigger post event for Delete from table
-        val deleteFromTablePostEvent: DeleteFromTablePostEvent =
-          DeleteFromTablePostEvent(sparkSession, carbonTable)
-        OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
+      if (executorErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executorErrors.errorMsg)
       }
+
+      // trigger post event for Delete from table
+      val deleteFromTablePostEvent: DeleteFromTablePostEvent =
+        DeleteFromTablePostEvent(sparkSession, carbonTable)
+      OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
     } catch {
       case e: HorizontalCompactionException =>
         LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 75008ad..bd53a66 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -91,7 +91,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
       // do delete operation.
-      DeleteExecution.deleteDeltaExecution(
+      val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
@@ -111,7 +111,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
         plan,
         sparkSession,
         currentTime,
-        executionErrors)
+        executionErrors,
+        segmentsToBeDeleted)
 
       if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
@@ -165,7 +166,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       plan: LogicalPlan,
       sparkSession: SparkSession,
       currentTime: Long,
-      executorErrors: ExecutionErrors): Unit = {
+      executorErrors: ExecutionErrors,
+      deletedSegments: Seq[String]): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
       val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -209,7 +211,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       case _ => sys.error("")
     }
 
-    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, deletedSegments)
 
     val header = getHeader(carbonRelation, plan)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 00d6657..994880c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -51,6 +51,10 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
 
+  /**
+   * generate the delete delta files in each segment as per the RDD.
+   * @return it gives the segments which needs to be deleted.
+   */
   def deleteDeltaExecution(
       databaseNameOp: Option[String],
       tableName: String,
@@ -58,7 +62,7 @@ object DeleteExecution {
       dataRdd: RDD[Row],
       timestamp: String,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors): Boolean = {
+      executorErrors: ExecutionErrors): Seq[String] = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -66,8 +70,8 @@ object DeleteExecution {
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
+    var segmentsTobeDeleted = Seq.empty[String]
 
-    var deleteStatus = true
     val deleteRdd = if (isUpdateOperation) {
       val schema =
         org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
@@ -92,7 +96,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (keyRdd.partitions.length == 0) {
-      return true
+      return segmentsTobeDeleted
     }
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
@@ -132,7 +136,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (res.isEmpty) {
-      return true
+      return segmentsTobeDeleted
     }
 
     // update new status file
@@ -154,7 +158,6 @@ object DeleteExecution {
             }
           }
           else {
-            deleteStatus = false
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
             LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName
}")
@@ -179,7 +182,7 @@ object DeleteExecution {
       val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
         .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
 
-
+      segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
 
       // this is delete flow so no need of putting timestamp in the status file.
       if (CarbonUpdateUtil
@@ -310,7 +313,7 @@ object DeleteExecution {
       resultIter
     }
 
-    true
+    segmentsTobeDeleted
   }
 
   private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 7f05cef..15deae1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -109,15 +109,21 @@ with Serializable {
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     // Set the update timestamp if user sets in case of update query. It needs to be updated
     // in load status update time
-    val updateTimeStamp = options.getOrElse("updatetimestamp", null)
-    if (updateTimeStamp != null) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp)
-      model.setFactTimeStamp(updateTimeStamp.toLong)
+    val updateTimeStamp = options.get("updatetimestamp")
+    if (updateTimeStamp.isDefined) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+      model.setFactTimeStamp(updateTimeStamp.get.toLong)
     }
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {
       conf.set("carbon.staticpartition", staticPartition)
     }
+    // In case of update query there is chance to remove the older segments, so here we can
set
+    // the to be deleted segments to mark as delete while updating tablestatus
+    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
+    if (segemntsTobeDeleted.isDefined) {
+      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
+    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {


Mime
View raw message