carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [49/50] [abbrv] carbondata git commit: [CARBONDATA-1999] Block drop table and delete streaming segment while streaming is in progress
Date Tue, 09 Jan 2018 04:02:17 GMT
[CARBONDATA-1999] Block drop table and delete streaming segment while streaming is in progress

1.Block drop table while streaming is in progress
2.Block delete streaming segment

This closes #1773


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1b72a02b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1b72a02b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1b72a02b

Branch: refs/heads/branch-1.3
Commit: 1b72a02be2f176bcc99dc12f7621a0623d39b973
Parents: 90921eb
Author: QiangCai <qiangcai@qq.com>
Authored: Mon Jan 8 11:14:00 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Jan 9 00:49:27 2018 +0800

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 42 ++++++-----
 .../command/table/CarbonDropTableCommand.scala  |  4 +
 .../sql/execution/strategy/DDLStrategy.scala    |  2 +-
 .../TestStreamingTableOperation.scala           | 77 ++++++++++++++------
 4 files changed, 85 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 7804ea8..e1fadcf 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -473,6 +473,7 @@ public class SegmentStatusManager {
    */
   private static List<String> updateDeletionStatus(List<String> loadIds,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds)
{
+    SegmentStatus segmentStatus = null;
     for (String loadId : loadIds) {
       boolean loadFound = false;
       // For each load id loop through data and if the
@@ -481,26 +482,29 @@ public class SegmentStatusManager {
       for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
 
         if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
-          // if the segment is compacted then no need to delete that.
-          if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
+          segmentStatus = loadMetadata.getSegmentStatus();
+          if (SegmentStatus.COMPACTED == segmentStatus) {
+            // if the segment is compacted then no need to delete that.
             LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          // if the segment status is in progress then no need to delete that.
-          if (SegmentStatus.INSERT_IN_PROGRESS == loadMetadata.getSegmentStatus()) {
+          } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus) {
+            // if the segment status is in progress then no need to delete that.
             LOG.error("Cannot delete the segment " + loadId + " which is load in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          // if the segment status is overwrite in progress, then no need to delete that.
-          if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == loadMetadata.getSegmentStatus())
{
-            LOG.error("Cannot delete the segemnt " + loadId + " which is load overwrite "
+
+          } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus) {
+            // if the segment status is overwrite in progress, then no need to delete that.
+            LOG.error("Cannot delete the segment " + loadId + " which is load overwrite "
+
                     "in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
+          } else if (SegmentStatus.STREAMING == segmentStatus) {
+            // if the segment status is streaming, the segment can't be deleted directly.
+            LOG.error("Cannot delete the segment " + loadId + " which is streaming in progress");
+            invalidLoadIds.add(loadId);
+            return invalidLoadIds;
+          } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
             loadFound = true;
             loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
             loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
@@ -535,17 +539,20 @@ public class SegmentStatusManager {
     // the metadata as deleted.
     boolean loadFound = false;
     String loadStartTimeString = "Load Start Time: ";
+    SegmentStatus segmentStatus = null;
     for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
       Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
       if (result < 0) {
-        if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
+        segmentStatus = loadMetadata.getSegmentStatus();
+        if (SegmentStatus.COMPACTED == segmentStatus) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment has been compacted.");
-          continue;
-        }
-        if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus() &&
-            SegmentStatus.INSERT_IN_PROGRESS != loadMetadata.getSegmentStatus() &&
-            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != loadMetadata.getSegmentStatus())
{
+        } else if (SegmentStatus.STREAMING == segmentStatus) {
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment is streaming in progress.");
+        } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus &&
+            SegmentStatus.INSERT_IN_PROGRESS != segmentStatus &&
+            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != segmentStatus) {
           loadFound = true;
           updateSegmentMetadataDetails(loadMetadata);
           LOG.info("Info: " +
@@ -553,7 +560,6 @@ public class SegmentStatusManager {
               " Marked for Delete");
         }
       }
-
     }
 
     if (!loadFound) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9901f8c..aaad207 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -55,6 +55,10 @@ case class CarbonDropTableCommand(
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+      if (carbonTable.isStreamingTable) {
+        // streaming table should acquire streaming.lock
+        carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
+      }
       val relationIdentifiers = carbonTable.getTableInfo.getParentRelationIdentifiers
       if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
         if (!dropChildTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 2805114..6ff762a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -223,7 +223,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (property.isDefined) {
           if (!property.get._2.trim.equalsIgnoreCase("true")) {
             throw new MalformedCarbonCommandException(
-              "Streaming property can not be changed to 'false' once it is 'true'")
+              "Streaming property can not be changed once it is 'true'")
           }
         }
         ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, isView)) ::
Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 62ab9af..a8ab6fb 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -105,7 +104,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true)
 
     // 11. table for delete segment test
-    createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false)
+    createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false)
+    createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad =
false)
 
     // 12. reject alter streaming properties
     createTable(tableName = "stream_table_alter", streaming = false, withBatchLoad = false)
@@ -126,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     // 17. reopen streaming table after close
     createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false)
 
+    // 18. block drop table while streaming is in progress
+    createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -204,7 +206,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists streaming.stream_table_compact")
     sql("drop table if exists streaming.stream_table_new")
     sql("drop table if exists streaming.stream_table_tolerant")
-    sql("drop table if exists streaming.stream_table_delete")
+    sql("drop table if exists streaming.stream_table_delete_id")
+    sql("drop table if exists streaming.stream_table_delete_date")
     sql("drop table if exists streaming.stream_table_alter")
     sql("drop table if exists streaming.stream_table_handoff")
     sql("drop table if exists streaming.stream_table_finish")
@@ -212,6 +215,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists streaming.stream_table_close")
     sql("drop table if exists streaming.stream_table_close_auto_handoff")
     sql("drop table if exists streaming.stream_table_reopen")
+    sql("drop table if exists streaming.stream_table_drop")
   }
 
   // normal table not support streaming ingest
@@ -600,7 +604,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     }
 
     sql("alter table streaming.stream_table_compact compact 'minor'")
-    sql("show segments for table streaming.stream_table_compact").show
 
     val result = sql("show segments for table streaming.stream_table_compact").collect()
     result.foreach { row =>
@@ -635,48 +638,55 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
   test("test deleting streaming segment by ID while ingesting") {
     executeStreamingIngest(
-      tableName = "stream_table_delete",
+      tableName = "stream_table_delete_id",
       batchNums = 6,
       rowNumsEachBatch = 10000,
       intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 15,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
       handoffSize = 1024L * 200,
       autoHandoff = false
     )
-    val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
-    val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
-    sql(s"delete from table streaming.stream_table_delete where segment.id in ($segmentId)
")
+    Thread.sleep(10000)
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
+    val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",")
+    val msg = intercept[Exception] {
+      sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds1)
")
+    }
+    assertResult(s"Delete segment by Id is failed. Invalid ID is: ${beforeDelete.length -1}")(msg.getMessage)
 
-    val rows = sql("show segments for table streaming.stream_table_delete").collect()
-    rows.foreach { row =>
+    val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming Finish"))
+      .map(_.getString(0)).mkString(",")
+    sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds2)
")
+    val afterDelete = sql("show segments for table streaming.stream_table_delete_id").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
       assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
     }
   }
 
   test("test deleting streaming segment by date while ingesting") {
     executeStreamingIngest(
-      tableName = "stream_table_delete",
+      tableName = "stream_table_delete_date",
       batchNums = 6,
       rowNumsEachBatch = 10000,
       intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 15,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
       handoffSize = 1024L * 200,
       autoHandoff = false
     )
-    val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
-
-    sql(s"delete from table streaming.stream_table_delete where segment.starttime before
" +
+    Thread.sleep(10000)
+    val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
+    sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before
" +
         s"'2999-10-01 01:00:00'")
-
-    val rows = sql("show segments for table streaming.stream_table_delete").collect()
-    assertResult(beforeDelete.length)(rows.length)
-    rows.foreach { row =>
+    val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming"))
+    assertResult(1)(segmentIds.length)
+    val afterDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
       assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
     }
   }
@@ -799,7 +809,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       autoHandoff = false
     )
     sql("alter table streaming.stream_table_finish finish streaming")
-    sql("show segments for table streaming.stream_table_finish").show(100, false)
 
     val segments = sql("show segments for table streaming.stream_table_finish").collect()
     assert(segments.length == 4 || segments.length == 5)
@@ -938,6 +947,32 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     )
   }
 
+  test("block drop streaming table while streaming is in progress") {
+    val identifier = new TableIdentifier("stream_table_drop", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket
+      val thread1 = createWriteSocketThread(server, 2, 10, 5)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier,
"force", 5, 1024L * 200, false)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(1000)
+      val msg = intercept[Exception] {
+        sql(s"drop table streaming.stream_table_drop")
+      }
+      assertResult("Dropping table streaming.stream_table_drop failed: Acquire table lock
failed after retry, please try after some time;")(msg.getMessage)
+      thread1.interrupt()
+      thread2.interrupt()
+    } finally {
+      if (server != null) {
+        server.close()
+      }
+    }
+  }
+
   test("do not support creating datamap on streaming table") {
     assert(
       intercept[MalformedCarbonCommandException](


Mime
View raw message