carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [11/50] [abbrv] carbondata git commit: [CARBONDATA-1904][CARBONDATA-1905] Support auto handoff and close streaming
Date Tue, 09 Jan 2018 04:01:39 GMT
[CARBONDATA-1904][CARBONDATA-1905] Support auto handoff and close streaming

Add support for:
1. auto handoff streaming segment
2. alter streaming table to normal table by syntax: alter table compact 'close_streaming'

This closes #1736


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a51ad30f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a51ad30f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a51ad30f

Branch: refs/heads/branch-1.3
Commit: a51ad30f6647387de818dbeb2c3c39cbac2a8416
Parents: 7f3c374
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Dec 28 18:43:43 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Jan 3 08:53:20 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +
 .../carbondata/core/util/CarbonProperties.java  |  21 ++
 .../CarbonAlterTableCompactionCommand.scala     |  66 +++++-
 .../CarbonAlterTableFinishStreaming.scala       |  28 ++-
 .../sql/execution/strategy/DDLStrategy.scala    |   3 +-
 .../TestStreamingTableOperation.scala           | 234 +++++++++++++++++--
 .../processing/merger/CompactionType.java       |   1 +
 .../streaming/segment/StreamSegment.java        |  71 ++----
 .../carbondata/streaming/StreamHandoffRDD.scala | 123 +++++-----
 .../streaming/CarbonAppendableStreamSink.scala  |  20 +-
 10 files changed, 437 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a05d023..fce8373 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1488,6 +1488,13 @@ public final class CarbonCommonConstants {
   public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size";
 
   /**
+   * enable auto handoff streaming segment
+   */
+  public static final String ENABLE_AUTO_HANDOFF = "carbon.streaming.auto.handoff.enabled";
+
+  public static final String ENABLE_AUTO_HANDOFF_DEFAULT = "true";
+
+  /**
    * the min handoff size of streaming segment, the unit is byte
    */
   public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 7b80a8b..19a3cf3 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -107,6 +107,7 @@ public final class CarbonProperties {
     validateCarbonCSVReadBufferSizeByte();
     validateHandoffSize();
     validateCombineSmallInputFiles();
+    validateEnableAutoHandoff();
   }
 
   private void validateCarbonCSVReadBufferSizeByte() {
@@ -297,6 +298,19 @@ public final class CarbonProperties {
     }
   }
 
+  private void validateEnableAutoHandoff() {
+    String enableAutoHandoffStr =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF);
+    boolean isValid = CarbonUtil.validateBoolean(enableAutoHandoffStr);
+    if (!isValid) {
+      LOGGER.warn("The enable auto handoff value \"" + enableAutoHandoffStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
+          CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT);
+    }
+  }
+
   /**
    * This method validates the number of pages per blocklet column
    */
@@ -792,6 +806,13 @@ public final class CarbonProperties {
     return handoffSize;
   }
 
+  public boolean isEnableAutoHandoff() {
+    String enableAutoHandoffStr = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
+        CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT);
+    return enableAutoHandoffStr.equalsIgnoreCase("true");
+  }
+
   /**
    * Validate the restrictions
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 45d3537..6daaae5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -17,27 +17,34 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import java.io.{File, IOException}
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableModel, CarbonMergerMapping, CompactionModel,
DataCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent,
AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
+import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**
  * Command for the compaction in alter table command
@@ -143,8 +150,14 @@ case class CarbonAlterTableCompactionCommand(
     if (compactionType == CompactionType.STREAMING) {
       StreamHandoffRDD.startStreamingHandoffThread(
         carbonLoadModel,
-        sqlContext,
-        storeLocation)
+        sqlContext.sparkSession)
+      return
+    }
+
+    if (compactionType == CompactionType.CLOSE_STREAMING) {
+      closeStreamingTable(
+        carbonLoadModel,
+        sqlContext.sparkSession)
       return
     }
 
@@ -254,4 +267,51 @@ case class CarbonAlterTableCompactionCommand(
       }
     }
   }
+
+  def closeStreamingTable(
+      carbonLoadModel: CarbonLoadModel,
+      sparkSession: SparkSession
+  ): Unit = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // 1. acquire lock of streaming.lock
+    val streamingLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier,
+      LockUsage.STREAMING_LOCK)
+    try {
+      if (streamingLock.lockWithRetries()) {
+        // 2. convert segment status from "streaming" to "streaming finish"
+        StreamSegment.finishStreaming(carbonTable)
+        // 3. iterate to handoff all streaming segment to batch segment
+        StreamHandoffRDD.iterateStreamingHandoff(carbonLoadModel, sparkSession)
+        val tableIdentifier =
+          new TableIdentifier(carbonTable.getTableName, Option(carbonTable.getDatabaseName))
+        // 4. modify table to normal table
+        AlterTableUtil.modifyTableProperties(
+          tableIdentifier,
+          Map("streaming" -> "false"),
+          Seq.empty,
+          true)(sparkSession,
+          sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+        // 5. remove checkpoint
+        val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+        FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir))
+        FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir))
+      } else {
+        val msg = "Failed to close streaming table, because streaming is locked for table
" +
+                  carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+        LOGGER.error(msg)
+        throw new IOException(msg)
+      }
+    } finally {
+      if (streamingLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after streaming finished" +
+                    carbonTable.getDatabaseName() + "." + carbonTable.getTableName())
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table " +
+                     carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+                     " during streaming finished")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index ce83815..59cc0c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql.execution.command.management
 
+import java.io.IOException
+
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.MetadataCommand
 
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**
@@ -31,7 +35,29 @@ case class CarbonAlterTableFinishStreaming(
   extends MetadataCommand {
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
-    StreamSegment.finishStreaming(carbonTable)
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val streamingLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+      LockUsage.STREAMING_LOCK)
+    try {
+      if (streamingLock.lockWithRetries()) {
+        StreamSegment.finishStreaming(carbonTable)
+      } else {
+        val msg = "Failed to finish streaming, because streaming is locked for table " +
+                  carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
+        LOGGER.error(msg)
+        throw new IOException(msg)
+      }
+    } finally {
+      if (streamingLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after streaming finished" +
+                    carbonTable.getDatabaseName() + "." + carbonTable.getTableName())
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table " +
+                     carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
+                     " during streaming finished")
+      }
+    }
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 89fcfd2..45f0f0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -98,7 +98,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (CompactionType.MINOR == compactionType ||
               CompactionType.MAJOR == compactionType ||
               CompactionType.SEGMENT_INDEX == compactionType ||
-              CompactionType.STREAMING == compactionType) {
+              CompactionType.STREAMING == compactionType ||
+              CompactionType.CLOSE_STREAMING == compactionType) {
             ExecutedCommandExec(alterTable) :: Nil
           } else {
             throw new MalformedCarbonCommandException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index f581c72..62ab9af 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -114,6 +115,17 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     // 14. finish streaming
     createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true)
+
+    // 15. auto handoff streaming segment
+    createTable(tableName = "stream_table_auto_handoff", streaming = true, withBatchLoad
= false)
+
+    // 16. close streaming table
+    createTable(tableName = "stream_table_close", streaming = true, withBatchLoad = false)
+    createTable(tableName = "stream_table_close_auto_handoff", streaming = true, withBatchLoad
= false)
+
+    // 17. reopen streaming table after close
+    createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false)
+
   }
 
   test("validate streaming property") {
@@ -196,6 +208,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists streaming.stream_table_alter")
     sql("drop table if exists streaming.stream_table_handoff")
     sql("drop table if exists streaming.stream_table_finish")
+    sql("drop table if exists streaming.stream_table_auto_handoff")
+    sql("drop table if exists streaming.stream_table_close")
+    sql("drop table if exists streaming.stream_table_close_auto_handoff")
+    sql("drop table if exists streaming.stream_table_reopen")
   }
 
   // normal table not support streaming ingest
@@ -239,7 +255,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 1,
       continueSeconds = 10,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     checkAnswer(
@@ -280,7 +297,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 1,
       continueSeconds = 10,
       generateBadRecords = true,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
     checkAnswer(
       sql("select count(*) from streaming.stream_table_socket"),
@@ -298,7 +316,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 1,
       continueSeconds = 10,
       generateBadRecords = true,
-      badRecordAction = "fail"
+      badRecordAction = "fail",
+      autoHandoff = false
     )
     val result = sql("select count(*) from streaming.bad_record_fail").collect()
     assert(result(0).getLong(0) < 25)
@@ -314,7 +333,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 1,
       continueSeconds = 20,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
     val result = sql("select count(*) from streaming.stream_table_1s").collect()
     // 20 seconds can't ingest all data, exists data delay
@@ -330,7 +350,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 10,
       continueSeconds = 50,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
     checkAnswer(
       sql("select count(*) from streaming.stream_table_10s"),
@@ -347,7 +368,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 5,
       continueSeconds = 30,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
     checkAnswer(
       sql("select count(*) from streaming.stream_table_batch"),
@@ -370,7 +392,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     val result = sql("select * from streaming.stream_table_scan order by id").collect()
@@ -393,7 +416,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     val result = sql("select * from streaming.stream_table_scan_complex order by id").collect()
@@ -418,7 +442,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = true,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     checkAnswer(
@@ -458,7 +483,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = true,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     checkAnswer(
@@ -503,7 +529,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = true,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     checkAnswer(
@@ -533,7 +560,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = true,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
 
     checkAnswer(
@@ -564,7 +592,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       intervalOfIngest = 4,
       continueSeconds = 20,
       generateBadRecords = false,
-      badRecordAction = "force"
+      badRecordAction = "force",
+      autoHandoff = false
     )
     for (_ <- 0 to 3) {
       executeBatchLoad("stream_table_compact")
@@ -593,7 +622,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds = 40,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200
+      handoffSize = 1024L * 200,
+      autoHandoff = false
     )
     assert(sql("show segments for table streaming.stream_table_new").count() > 1)
 
@@ -613,7 +643,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds = 15,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200
+      handoffSize = 1024L * 200,
+      autoHandoff = false
     )
     val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
     val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
@@ -635,7 +666,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds = 15,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200
+      handoffSize = 1024L * 200,
+      autoHandoff = false
     )
     val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect()
 
@@ -668,7 +700,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
         continueSeconds = 40,
         generateBadRecords = false,
         badRecordAction = "force",
-        handoffSize = 1024L * 200
+        handoffSize = 1024L * 200,
+        autoHandoff = false
       )
       checkAnswer(
         sql("select count(*) from streaming.stream_table_alter"),
@@ -698,7 +731,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds = 40,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200
+      handoffSize = 1024L * 200,
+      autoHandoff = false
     )
     val segments = sql("show segments for table streaming.stream_table_handoff").collect()
     assert(segments.length == 3 || segments.length == 4)
@@ -726,6 +760,31 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     )
   }
 
+  test("auto handoff 'streaming finish' segment to columnar segment") {
+    executeStreamingIngest(
+      tableName = "stream_table_auto_handoff",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200,
+      autoHandoff = true
+    )
+    Thread.sleep(10000)
+    val segments = sql("show segments for table streaming.stream_table_auto_handoff").collect()
+    assertResult(5)(segments.length)
+    assertResult(2)(segments.filter(_.getString(1).equals("Success")).length)
+    assertResult(2)(segments.filter(_.getString(1).equals("Compacted")).length)
+    assertResult(1)(segments.filter(_.getString(1).equals("Streaming")).length)
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_auto_handoff"),
+      Seq(Row(6 * 10000))
+    )
+  }
+
   test("alter table finish streaming") {
     executeStreamingIngest(
       tableName = "stream_table_finish",
@@ -736,7 +795,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds = 40,
       generateBadRecords = false,
       badRecordAction = "force",
-      handoffSize = 1024L * 200
+      handoffSize = 1024L * 200,
+      autoHandoff = false
     )
     sql("alter table streaming.stream_table_finish finish streaming")
     sql("show segments for table streaming.stream_table_finish").show(100, false)
@@ -753,6 +813,131 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     )
   }
 
+  test("alter table close streaming") {
+    executeStreamingIngest(
+      tableName = "stream_table_close",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200,
+      autoHandoff = false
+    )
+
+    val table1 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark)
+    assertResult(true)(table1.isStreamingTable)
+    sql("alter table streaming.stream_table_close compact 'close_streaming'")
+
+    val segments = sql("show segments for table streaming.stream_table_close").collect()
+    assertResult(6)(segments.length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_close"),
+      Seq(Row(6 * 10000))
+    )
+    val table2 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark)
+    assertResult(false)(table2.isStreamingTable)
+  }
+
+  test("alter table close streaming with auto handoff") {
+    executeStreamingIngest(
+      tableName = "stream_table_close_auto_handoff",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200,
+      autoHandoff = true
+    )
+    Thread.sleep(10000)
+
+    val table1 =
+      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark)
+    assertResult(true)(table1.isStreamingTable)
+
+    sql("alter table streaming.stream_table_close_auto_handoff compact 'close_streaming'")
+    val segments =
+      sql("show segments for table streaming.stream_table_close_auto_handoff").collect()
+    assertResult(6)(segments.length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_close_auto_handoff"),
+      Seq(Row(6 * 10000))
+    )
+
+    val table2 =
+      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark)
+    assertResult(false)(table2.isStreamingTable)
+  }
+
+  test("reopen streaming table") {
+    executeStreamingIngest(
+      tableName = "stream_table_reopen",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200,
+      autoHandoff = true
+    )
+    Thread.sleep(10000)
+
+    val table1 =
+      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
+    assertResult(true)(table1.isStreamingTable)
+
+    sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
+    val segments =
+      sql("show segments for table streaming.stream_table_reopen").collect()
+    assertResult(6)(segments.length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Success")).length)
+    assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length)
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_reopen"),
+      Seq(Row(6 * 10000))
+    )
+
+    val table2 =
+      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
+    assertResult(false)(table2.isStreamingTable)
+
+    sql("ALTER TABLE streaming.stream_table_reopen SET TBLPROPERTIES('streaming'='true')")
+
+    val table3 =
+      CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark)
+    assertResult(true)(table3.isStreamingTable)
+
+    executeStreamingIngest(
+      tableName = "stream_table_reopen",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200,
+      autoHandoff = true
+    )
+    Thread.sleep(10000)
+
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_reopen"),
+      Seq(Row(6 * 10000 * 2))
+    )
+  }
+
   test("do not support creating datamap on streaming table") {
     assert(
       intercept[MalformedCarbonCommandException](
@@ -828,7 +1013,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
-      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT): Thread = {
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Thread = {
     new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -848,6 +1035,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+            .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
             .start()
           qry.awaitTermination()
         } catch {
@@ -874,7 +1062,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       continueSeconds: Int,
       generateBadRecords: Boolean,
       badRecordAction: String,
-      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
   ): Unit = {
     val identifier = new TableIdentifier(tableName, Option("streaming"))
     val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
@@ -896,7 +1085,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,
-        handoffSize = handoffSize)
+        handoffSize = handoffSize,
+        autoHandoff = autoHandoff)
       thread1.start()
       thread2.start()
       Thread.sleep(continueSeconds * 1000)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 314afc2..39f56a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -29,5 +29,6 @@ public enum CompactionType {
     IUD_DELETE_DELTA,
     SEGMENT_INDEX,
     STREAMING,
+    CLOSE_STREAMING,
     NONE
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7a62183..69d0b8d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -186,62 +186,41 @@ public class StreamSegment {
    * change the status of the segment from "streaming" to "streaming finish"
    */
   public static void finishStreaming(CarbonTable carbonTable) throws IOException {
-    ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+    ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj(
         carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
-        LockUsage.STREAMING_LOCK);
+        LockUsage.TABLE_STATUS_LOCK);
     try {
-      if (streamingLock.lockWithRetries()) {
-        ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj(
-            carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
-            LockUsage.TABLE_STATUS_LOCK);
-        try {
-          if (statusLock.lockWithRetries()) {
-            LoadMetadataDetails[] details =
-                SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
-            boolean updated = false;
-            for (LoadMetadataDetails detail : details) {
-              if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
-                detail.setLoadEndTime(System.currentTimeMillis());
-                detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
-                updated = true;
-              }
-            }
-            if (updated) {
-              CarbonTablePath tablePath =
-                  CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
-              SegmentStatusManager.writeLoadDetailsIntoFile(
-                  tablePath.getTableStatusFilePath(), details);
-            }
-          } else {
-            String msg = "Failed to acquire table status lock of " +
-                carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
-            LOGGER.error(msg);
-            throw new IOException(msg);
-          }
-        } finally {
-          if (statusLock.unlock()) {
-            LOGGER.info("Table unlocked successfully after table status updation" +
-                carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
-          } else {
-            LOGGER.error("Unable to unlock Table lock for table " +
-                carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
-                " during table status updation");
+      if (statusLock.lockWithRetries()) {
+        LoadMetadataDetails[] details =
+            SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+        boolean updated = false;
+        for (LoadMetadataDetails detail : details) {
+          if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+            detail.setLoadEndTime(System.currentTimeMillis());
+            detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+            updated = true;
           }
         }
+        if (updated) {
+          CarbonTablePath tablePath =
+              CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              tablePath.getTableStatusFilePath(),
+              details);
+        }
       } else {
-        String msg = "Failed to finish streaming, because streaming is locked for table "
+
-            carbonTable.getDatabaseName() + "." + carbonTable.getTableName();
+        String msg = "Failed to acquire table status lock of " + carbonTable.getDatabaseName()
+            + "." + carbonTable.getTableName();
         LOGGER.error(msg);
         throw new IOException(msg);
       }
     } finally {
-      if (streamingLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable
-            .getDatabaseName() + "." + carbonTable.getTableName());
+      if (statusLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation"
+            + carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
       } else {
-        LOGGER.error("Unable to unlock Table lock for table " +
-            carbonTable.getDatabaseName() + "." + carbonTable.getTableName() +
-            " during streaming finished");
+        LOGGER.error("Unable to unlock Table lock for table " + carbonTable.getDatabaseName()
+            + "." + carbonTable.getTableName() + " during table status updation");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 765a88b..c88575e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.streaming
 
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util
 import java.util.Date
@@ -25,13 +26,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
@@ -45,7 +46,7 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor,
C
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
 import org.apache.carbondata.spark.rdd.CarbonRDD
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**
  * partition of the handoff segment
@@ -206,68 +207,73 @@ object StreamHandoffRDD {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
+  def iterateStreamingHandoff(
+      carbonLoadModel: CarbonLoadModel,
+      sparkSession: SparkSession
+  ): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val identifier = carbonTable.getAbsoluteTableIdentifier
+    val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
+    var continueHandoff = false
+    // require handoff lock on table
+    val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
+    try {
+      if (lock.lockWithRetries()) {
+        LOGGER.info("Acquired the handoff lock for table" +
+                    s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+        // handoff streaming segment one by one
+        do {
+          val segmentStatusManager = new SegmentStatusManager(identifier)
+          var loadMetadataDetails: Array[LoadMetadataDetails] = null
+          // lock table to read table status file
+          val statusLock = segmentStatusManager.getTableStatusLock
+          try {
+            if (statusLock.lockWithRetries()) {
+              loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+                tablePath.getMetadataDirectoryPath)
+            }
+          } finally {
+            if (null != statusLock) {
+              statusLock.unlock()
+            }
+          }
+          if (null != loadMetadataDetails) {
+            val streamSegments =
+              loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
+
+            continueHandoff = streamSegments.length > 0
+            if (continueHandoff) {
+              // handoff a streaming segment
+              val loadMetadataDetail = streamSegments(0)
+              executeStreamingHandoff(
+                carbonLoadModel,
+                sparkSession,
+                loadMetadataDetail.getLoadName
+              )
+            }
+          } else {
+            continueHandoff = false
+          }
+        } while (continueHandoff)
+      }
+    } finally {
+      if (null != lock) {
+        lock.unlock()
+      }
+    }
+  }
+
   /**
    * start new thread to execute stream segment handoff
    */
   def startStreamingHandoffThread(
       carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      storeLocation: String
+      sparkSession: SparkSession
   ): Unit = {
     // start a new thread to execute streaming segment handoff
     val handoffThread = new Thread() {
       override def run(): Unit = {
-        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-        val identifier = carbonTable.getAbsoluteTableIdentifier
-        val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
-        var continueHandoff = false
-        // require handoff lock on table
-        val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
-        try {
-          if (lock.lockWithRetries()) {
-            LOGGER.info("Acquired the handoff lock for table" +
-                        s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
-            // handoff streaming segment one by one
-            do {
-              val segmentStatusManager = new SegmentStatusManager(identifier)
-              var loadMetadataDetails: Array[LoadMetadataDetails] = null
-              // lock table to read table status file
-              val statusLock = segmentStatusManager.getTableStatusLock
-              try {
-                if (statusLock.lockWithRetries()) {
-                  loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                    tablePath.getMetadataDirectoryPath)
-                }
-              } finally {
-                if (null != statusLock) {
-                  statusLock.unlock()
-                }
-              }
-              if (null != loadMetadataDetails) {
-                val streamSegments =
-                  loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
-
-                continueHandoff = streamSegments.length > 0
-                if (continueHandoff) {
-                  // handoff a streaming segment
-                  val loadMetadataDetail = streamSegments(0)
-                  executeStreamingHandoff(
-                    carbonLoadModel,
-                    sqlContext,
-                    storeLocation,
-                    loadMetadataDetail.getLoadName
-                  )
-                }
-              } else {
-                continueHandoff = false
-              }
-            } while (continueHandoff)
-          }
-        } finally {
-          if (null != lock) {
-            lock.unlock()
-          }
-        }
+        iterateStreamingHandoff(carbonLoadModel, sparkSession)
       }
     }
     handoffThread.start()
@@ -278,8 +284,7 @@ object StreamHandoffRDD {
    */
   def executeStreamingHandoff(
       carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      storeLocation: String,
+      sparkSession: SparkSession,
       handoffSegmenId: String
   ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -297,7 +302,7 @@ object StreamHandoffRDD {
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
-        sqlContext.sparkContext,
+        sparkSession.sparkContext,
         new HandoffResultImpl(),
         carbonLoadModel,
         handoffSegmenId).collect()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 064ba37..ce9446f 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD}
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 import org.apache.carbondata.streaming.segment.StreamSegment
 
@@ -80,6 +80,12 @@ class CarbonAppendableStreamSink(
     CarbonProperties.getInstance().getHandoffSize
   )
 
+  // auto handoff
+  private val enableAutoHandoff = hadoopConf.getBoolean(
+    CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
+    CarbonProperties.getInstance().isEnableAutoHandoff
+  )
+
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
@@ -127,14 +133,18 @@ class CarbonAppendableStreamSink(
     val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
-      val newSegmentId =
-        StreamSegment.close(carbonTable, currentSegmentId)
+      val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
       val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
-    }
 
-    // TODO trigger hand off operation
+      // TODO trigger hand off operation
+      if (enableAutoHandoff) {
+        StreamHandoffRDD.startStreamingHandoffThread(
+          carbonLoadModel,
+          sparkSession)
+      }
+    }
   }
 }
 


Mime
View raw message