carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-1886] Check and Delete stale segment folders on new load
Date Wed, 13 Dec 2017 04:40:26 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 970a32a26 -> 5e3aec43e


[CARBONDATA-1886] Check and Delete stale segment folders on new load

segment folders are not getting deleted if corresponding entry is not available in table status
file. Due to this query gives more record count than actual

This closes #1646


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5e3aec43
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5e3aec43
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5e3aec43

Branch: refs/heads/master
Commit: 5e3aec43ed65f48a63a396aac016e1ac06ef8589
Parents: 970a32a
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Tue Dec 12 16:25:31 2017 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Dec 13 10:13:03 2017 +0530

----------------------------------------------------------------------
 .../spark/testsuite/dataload/TestLoadDataGeneral.scala | 13 ++++++++++++-
 .../command/management/CarbonLoadDataCommand.scala     |  9 ++++++---
 .../processing/loading/TableProcessingOperations.java  |  3 +--
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e3d497a..49f3c5e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
@@ -191,6 +190,18 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("test if stale folders are deleting on data load") {
+    sql("drop table if exists stale")
+    sql("create table stale(a string) stored by 'carbondata'")
+    sql("insert into stale values('k')")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "stale")
+    val tableStatusFile = new CarbonTablePath(null,
+      carbonTable.getTablePath).getTableStatusFilePath
+    FileFactory.getCarbonFile(tableStatusFile).delete()
+    sql("insert into stale values('k')")
+    checkAnswer(sql("select * from stale"), Row("k"))
+  }
+
   override def afterAll {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index ebdaa33..9f6fce1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation,
RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
@@ -42,9 +42,9 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -139,7 +139,10 @@ case class CarbonLoadDataCommand(
         carbonLoadModel,
         hadoopConf
       )
-
+      // Delete stale segment folders that are not in table status but are physically present
in
+      // the Fact folder
+      LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+      TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
       try {
         val operationContext = new OperationContext
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5e3aec43/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index cb53d6e..e2be79c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -74,8 +74,7 @@ public class TableProcessingOperations {
                 CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
             boolean found = false;
             for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
-                  .equals(partitionCount)) {
+              if (details[j].getLoadName().equals(segmentId)) {
                 found = true;
                 break;
               }


Mime
View raw message