carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From indhumuthumurug...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4072] Clean files command is not deleting .segment files for the segments added through alter table add segment query.
Date Fri, 18 Dec 2020 09:50:00 GMT
This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e347ca9  [CARBONDATA-4072] Clean files command is not deleting .segment files for
the segments added through alter table add segment query.
e347ca9 is described below

commit e347ca9fa185566ed47b83bb60c0bc85dc4f6a2d
Author: Karan980 <karan.bajwa980@gmail.com>
AuthorDate: Tue Dec 8 16:13:16 2020 +0530

    [CARBONDATA-4072] Clean files command is not deleting .segment files for the
    segments added through alter table add segment query.
    
    Why is this PR needed?
    Clean files command is not deleting .segment files present at
    tablePath/Metadata/Segments for the compacted/marked for delete segments
    added through alter table add segment query.
    
    What changes were proposed in this PR?
    Added code for deleting the segment file for the segments added
    through alter table add segment query.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4048
---
 .../carbondata/core/metadata/SegmentFileStore.java |  11 +-
 .../carbondata/core/util/CarbonTestUtil.java       |  12 ++
 .../carbondata/core/util/DeleteLoadFolders.java    |  22 ++-
 .../command/management/CarbonAddLoadCommand.scala  |  13 +-
 .../testsuite/addsegment/AddSegmentTestCase.scala  |  48 +++---
 .../cleanfiles/TestCleanFileCommand.scala          | 179 ++++++++++++++++++++-
 6 files changed, 239 insertions(+), 46 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index f8c8cb1..9e8be0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1112,6 +1112,13 @@ public class SegmentFileStore {
     cleanSegments(table, details, partitionSpecs, forceDelete);
   }
 
+  public static void deleteSegmentFile(String tablePath, Segment segment) throws Exception
{
+    String segmentFilePath =
+        CarbonTablePath.getSegmentFilePath(tablePath, segment.getSegmentFileName());
+    // Deletes the physical segment file
+    FileFactory.deleteFile(segmentFilePath);
+  }
+
   /**
    * Deletes the segment file and its physical files like partition folders from disk
    * @param tablePath
@@ -1138,10 +1145,6 @@ public class SegmentFileStore {
       }
     }
     deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, tablePath);
-    String segmentFilePath =
-        CarbonTablePath.getSegmentFilePath(tablePath, segment.getSegmentFileName());
-    // Deletes the physical segment file
-    FileFactory.deleteFile(segmentFilePath);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
index b7b2a9c..822c7fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.util;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -158,4 +160,14 @@ public class CarbonTestUtil {
     }
     return isLocalDictionaryGenerated;
   }
+
+  public static void copy(String oldLoc, String newLoc) throws IOException {
+    CarbonFile oldFolder = FileFactory.getCarbonFile(oldLoc);
+    FileFactory.mkdirs(newLoc, FileFactory.getConfiguration());
+    CarbonFile[] oldFiles = oldFolder.listFiles();
+    for (CarbonFile file : oldFiles) {
+      Files.copy(Paths.get(file.getParentFile().getPath(), file.getName()),
+          Paths.get(newLoc, file.getName()));
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 460b4e9..f8f2607 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -115,12 +115,17 @@ public final class DeleteLoadFolders {
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete, cleanStaleInProgress))
{
+      if (canDeleteThisLoad(oneLoad, isForceDelete, cleanStaleInProgress)) {
         try {
           if (oneLoad.getSegmentFile() != null) {
-            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
-                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()),
-                specs, updateStatusManager);
+            String tablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+            Segment segment = new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile());
+            // No need to delete physical data for external segments.
+            if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
+              SegmentFileStore.deleteSegment(tablePath, segment, specs, updateStatusManager);
+            }
+            // delete segment files for all segments.
+            SegmentFileStore.deleteSegmentFile(tablePath, segment);
           } else {
             String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
             boolean status = false;
@@ -182,15 +187,6 @@ public final class DeleteLoadFolders {
     return false;
   }
 
-  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
-      boolean isForceDelete, boolean cleanStaleInProgress) {
-    // Check if the segment is added externally and path is set then do not delete it
-    if (oneLoad.getPath() == null || oneLoad.getPath().equalsIgnoreCase("NA")) {
-      return canDeleteThisLoad(oneLoad,  isForceDelete, cleanStaleInProgress);
-    }
-    return false;
-  }
-
   private static Boolean canDeleteThisLoad(LoadMetadataDetails oneLoad,
       boolean isForceDelete, boolean cleanStaleInProgress) {
     /*
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 4defda0..8f153a4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -100,11 +100,14 @@ case class CarbonAddLoadCommand(
 
     // If a path is already added then we should block the adding of the same path again.
     val allSegments = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    // If the segment has been already loaded from the same path and its status is SUCCESS
or
-    // PARTIALLY_SUCCESS, throw an exception as we should block the adding of the same path
again.
-    if (allSegments.exists(a => a.getPath != null && a.getPath.equalsIgnoreCase(inputPath)
&&
-      (a.getSegmentStatus == SegmentStatus.SUCCESS ||
-        a.getSegmentStatus == SegmentStatus .LOAD_PARTIAL_SUCCESS))) {
+    // If the segment has been already loaded from the same path or the segment is already
present
+    // in the table and its status is SUCCESS orPARTIALLY_SUCCESS, throw an exception as
we should
+    // block the adding of the same path again.
+    if (allSegments.exists(a => ((a.getPath != null && a.getPath.equalsIgnoreCase(inputPath))
||
+                                 CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
+                                   a.getLoadName).equalsIgnoreCase(inputPath)) &&
+                                (a.getSegmentStatus == SegmentStatus.SUCCESS ||
+                                 a.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)))
{
       throw new AnalysisException(s"path already exists in table status file, can not add
same " +
                                   s"segment path repeatedly: $inputPath")
     }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index ad3b50f..1c8700b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -17,7 +17,6 @@
 package org.apache.carbondata.spark.testsuite.addsegment
 
 import java.io.File
-import java.nio.file.{Files, Paths}
 
 import scala.io.Source
 
@@ -35,7 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
 import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
@@ -64,7 +63,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
     val newPath = storeLocation + "/" + "addsegtest"
     val newPathWithLineSeparator = storeLocation + "/" + "addsegtest/"
-    copy(path, newPath)
+    CarbonTestUtil.copy(path, newPath)
     sql("delete from table addsegment1 where segment.id in (1)")
     sql("clean files for table addsegment1")
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
@@ -94,7 +93,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
     val newPath = storeLocation + "/" + "addsegtest"
-    copy(path, newPath)
+    CarbonTestUtil.copy(path, newPath)
     sql("delete from table addsegment1 where segment.id in (1)")
     sql("clean files for table addsegment1")
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
@@ -122,7 +121,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
     val newPath = storeLocation + "/" + "addsegtest"
-    copy(path, newPath)
+    CarbonTestUtil.copy(path, newPath)
     sql("delete from table addsegment1 where segment.id in (1)")
     sql("clean files for table addsegment1")
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
@@ -153,7 +152,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
     val newPath = storeLocation + "/" + "addsegtest"
     for (i <- 0 until 10) {
-      copy(path, newPath + i)
+      CarbonTestUtil.copy(path, newPath + i)
     }
 
     sql("delete from table addsegment1 where segment.id in (1)")
@@ -189,7 +188,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
     val newPath = storeLocation + "/" + "addsegtest"
-    copy(path, newPath)
+    CarbonTestUtil.copy(path, newPath)
     sql("delete from table addsegment1 where segment.id in (1)")
     sql("clean files for table addsegment1")
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
@@ -226,7 +225,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonEnv.getCarbonTable(None, "addsegment2") (sqlContext.sparkSession)
     val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
     val newPath = storeLocation + "/" + "addsegtest"
-    copy(path, newPath)
+    CarbonTestUtil.copy(path, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
 
     val ex = intercept[Exception] {
@@ -248,7 +247,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
 
     sql(s"alter table addsegment1 add segment " +
@@ -284,7 +283,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
 
     sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')")
     val exception1 = intercept[MalformedCarbonCommandException](sql(
@@ -311,7 +310,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
     sql(
       """
@@ -342,7 +341,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
 
     sql(s"alter table addsegment1 add segment " +
@@ -446,7 +445,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
     sql(s"alter table addsegment1 add segment " +
@@ -468,7 +467,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
 
     val res1 = sql("select empname, deptname from addsegment1 where deptno=10")
 
@@ -504,7 +503,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path.toString, newPath)
+    CarbonTestUtil.copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
     sql("alter table addsegment1 add segment " +
@@ -740,7 +739,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     val path1 = table1.location
     val newPath1 = storeLocation + "/" + pathName
     FileFactory.deleteAllFilesOfDir(new File(newPath1))
-    copy(path1.toString, newPath1)
+    CarbonTestUtil.copy(path1.toString, newPath1)
     newPath1
   }
 
@@ -964,14 +963,17 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop table $tableName")
   }
 
-  def copy(oldLoc: String, newLoc: String): Unit = {
-    val oldFolder = FileFactory.getCarbonFile(oldLoc)
-    FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
-    val oldFiles = oldFolder.listFiles
-    for (file <- oldFiles) {
-      Files.copy(Paths.get(file.getParentFile.getPath, file.getName),
-        Paths.get(newLoc, file.getName))
+  test("Test Adding invalid segment which is already present inside the table") {
+    createCarbonTable()
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val ex = intercept[Exception] {
+      sql("alter table addsegment1 add segment " +
+          s"options('path'='$path', 'format'='carbon')").collect()
     }
+    assert(ex.getMessage.contains("can not add same segment path repeatedly"))
   }
 
   def getDataSize(path: String): String = {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
index 5c288d7..8812253 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
@@ -21,14 +21,19 @@ import java.io.{File, PrintWriter}
 
 import scala.io.Source
 
+import org.apache.commons.lang.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 
 class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -306,6 +311,161 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll
{
       .removeProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS)
   }
 
+  test("Test clean files on segments after compaction and deletion of segments on" +
+       " partition table with mixed formats") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("DROP TABLE IF EXISTS partition_carbon_table")
+    sql("DROP TABLE IF EXISTS partition_parquet_table")
+    sql("""CREATE TABLE partition_carbon_table (id Int, vin String, logdate Date,phonenumber
Long,
+         area String, salary Int) PARTITIONED BY (country String)
+          STORED AS carbondata""".stripMargin)
+    for (i <- 0 until 5) {
+      sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data_example.csv'
+             | into table partition_carbon_table""".stripMargin)
+    }
+    sql("""CREATE TABLE partition_parquet_table (id Int, vin String, logdate Date,phonenumber
Long,
+         country String, area String, salary Int)
+         using parquet PARTITIONED BY (country)""".stripMargin)
+    sql(s"""insert into partition_parquet_table select * from partition_carbon_table""")
+    val parquetRootPath = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("partition_parquet_table")).location
+    sql(s"alter table partition_carbon_table add segment options ('path'='$parquetRootPath',
" +
+        "'format'='parquet', 'partition'='country:string')")
+    sql("alter table partition_carbon_table compact 'minor'").collect()
+    sql("delete from table partition_carbon_table where segment.id in (7,8)")
+    sql("clean files for table partition_carbon_table OPTIONS('force'='true')")
+    val table = CarbonEnv
+      .getCarbonTable(None, "partition_carbon_table") (sqlContext.sparkSession)
+    val segmentsFilePath = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+    val files = new File(segmentsFilePath).listFiles()
+    assert(files.length == 6)
+    val segmentIds = files.map(file => getSegmentIdFromSegmentFilePath(file.getAbsolutePath))
+    assert(segmentIds.contains("0.1"))
+    assert(!segmentIds.contains("7"))
+    assert(!segmentIds.contains("8"))
+    sql("DROP TABLE IF EXISTS partition_carbon_table")
+    sql("DROP TABLE IF EXISTS partition_parquet_table")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+  }
+
+  test("Test clean files on segments(MFD/Compacted/inProgress) after compaction and" +
+       " deletion of segments") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv'
+         | INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    for (i <- 0 until 6) {
+      FileFactory.deleteAllFilesOfDir(new File(newPath + i))
+      CarbonTestUtil.copy(path, newPath + i)
+    }
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    for (i <- 0 until 6) {
+      sql(s"alter table addsegment1 add segment " +
+          s"options('path'='${ newPath + i }', 'format'='carbon')").collect()
+    }
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("alter table addsegment1 compact 'minor'").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("clean files for table addsegment1 OPTIONS('force'='true')")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql(s"alter table addsegment1 add segment " +
+        s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(90)))
+    sql("delete from table addsegment1 where segment.id in (8)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(80)))
+    sql("clean files for table addsegment1 OPTIONS('force'='true')")
+    sql(s"alter table addsegment1 add segment " +
+        s"options('path'='${ newPath + 0 }', 'format'='carbon')").collect()
+    // testing for in progress segments
+    val tableStatusPath = CarbonTablePath.getTableStatusFilePath(table.getTablePath)
+    val segments = SegmentStatusManager.readTableStatusFile(tableStatusPath)
+    segments.foreach(segment => if (segment.getLoadName.equals("9")) {
+      segment.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS)
+    })
+    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, segments)
+    sql("clean files for table addsegment1 OPTIONS('force'='true','stale_inprogress'='true')")
+    val segmentsFilePath = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+    val files = new File(segmentsFilePath).listFiles()
+    assert(files.length == 2)
+    val segmentIds = files.map(file => getSegmentIdFromSegmentFilePath(file.getAbsolutePath))
+    assert(segmentIds.contains("0.1"))
+    assert(segmentIds.contains("4.1"))
+    assert(!segmentIds.contains("8"))
+    assert(!segmentIds.contains("9"))
+    for (i <- 0 until 6) {
+      val oldFolder = FileFactory.getCarbonFile(newPath + i)
+      assert(oldFolder.listFiles.length == 2,
+        "Older data present at external location should not be deleted")
+      FileFactory.deleteAllFilesOfDir(new File(newPath + i))
+    }
+    sql("drop table if exists addsegment1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+  }
+
+  test("Test clean files on segments(MFD/Compacted/inProgress) after deletion of segments"
+
+       " present inside table path") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED, "true")
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    val newPath = table.getTablePath + "/internalPath"
+    CarbonTestUtil.copy(path, newPath)
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    sql(s"alter table addsegment1 add segment " +
+          s"options('path'='$newPath', 'format'='carbon')").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    sql("delete from table addsegment1 where segment.id in (1)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    sql("clean files for table addsegment1 OPTIONS('force'='true')")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    val segmentsFilePath = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+    val files = new File(segmentsFilePath).listFiles()
+    assert(files.length == 1)
+    val segmentIds = files.map(file => getSegmentIdFromSegmentFilePath(file.getAbsolutePath))
+    assert(segmentIds.contains("0"))
+    assert(!segmentIds.contains("1"))
+    val oldFolder = FileFactory.getCarbonFile(newPath)
+    assert(oldFolder.listFiles.length == 2,
+        "Older data present at external location should not be deleted")
+    sql("drop table if exists addsegment1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED,
+        CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED_DEFAULT)
+    }
+
   def editTableStatusFile(carbonTablePath: String) : Unit = {
     // original table status file
     val f1 = new File(CarbonTablePath.getTableStatusFilePath(carbonTablePath))
@@ -355,6 +515,23 @@ class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
   }
 
+  /**
+   * gets segment id from given absolute segment file path
+   */
+  def getSegmentIdFromSegmentFilePath(segmentFilePath: String): String = {
+    val tempSegmentFileAbsolutePath = segmentFilePath.replace(CarbonCommonConstants
+      .WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR)
+    if (!StringUtils.isBlank(tempSegmentFileAbsolutePath)) {
+      val pathElements = tempSegmentFileAbsolutePath.split(CarbonCommonConstants
+        .FILE_SEPARATOR)
+      if (pathElements != null && pathElements.nonEmpty) {
+        val fileName = pathElements(pathElements.length - 1)
+        return DataFileUtil.getSegmentNoFromSegmentFile(fileName)
+      }
+    }
+    CarbonCommonConstants.INVALID_SEGMENT_ID
+  }
+
   def loadData() : Unit = {
     sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")
     sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1, "name"""")


Mime
View raw message