carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1586][Streaming] Support handoff from row format to columnar format
Date Mon, 27 Nov 2017 09:10:47 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 373342d0e -> f8e0585d2


[CARBONDATA-1586][Streaming] Support handoff from row format to columnar format

1.configuration
carbon.handoff.size =>carbon.streaming.segment.max.size
2.SQL command
alter table <table_name> compact 'streaming'
3.refactory CompactionType

This closes #1566


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8e0585d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8e0585d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8e0585d

Branch: refs/heads/master
Commit: f8e0585d2c31b90e70ff2574e47c6434d61fcced
Parents: 373342d
Author: QiangCai <qiangcai@qq.com>
Authored: Sat Nov 25 01:54:41 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Nov 27 17:10:34 2017 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/locks/LockUsage.java |   1 +
 .../statusmanager/SegmentStatusManager.java     |   6 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   2 +-
 .../streaming/CarbonStreamOutputFormat.java     |   2 +-
 .../streaming/CarbonStreamRecordReader.java     |  69 +++-
 .../org/apache/carbondata/spark/KeyVal.scala    |   9 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  19 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |   4 +-
 .../spark/rdd/DataManagementFunc.scala          |   8 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  45 +-
 .../AlterTableCompactionCommand.scala           |  60 +--
 .../command/mutation/HorizontalCompaction.scala |   8 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  16 +-
 .../TestStreamingTableOperation.scala           |  49 ++-
 .../merger/AbstractResultProcessor.java         |   2 +-
 .../processing/merger/CarbonCompactionUtil.java |  10 +-
 .../processing/merger/CarbonDataMergerUtil.java |  16 +-
 .../merger/CompactionResultSortProcessor.java   |  14 +-
 .../processing/merger/CompactionType.java       |  11 +-
 .../carbondata/streaming/StreamHandoffRDD.scala | 408 +++++++++++++++++++
 21 files changed, 654 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index 434129c..ca9a721 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -25,6 +25,7 @@ public class LockUsage {
   public static final String LOCK = ".lock";
   public static final String METADATA_LOCK = "meta.lock";
   public static final String COMPACTION_LOCK = "compaction.lock";
+  public static final String HANDOFF_LOCK = "handoff.lock";
   public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
   public static final String ALTER_PARTITION_LOCK = "alter_partition.lock";
   public static final String TABLE_STATUS_LOCK = "tablestatus.lock";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 2409219..aea9110 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -180,15 +180,15 @@ public class SegmentStatusManager {
   /**
    * This method reads the load metadata file
    *
-   * @param tableFolderPath
+   * @param metadataFolderPath
    * @return
    */
-  public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
+  public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
-    String metadataFileName = tableFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+    String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
         + CarbonCommonConstants.LOADMETADATA_FILENAME;
     LoadMetadataDetails[] listOfLoadFolderDetailsArray;
     AtomicFileOperations fileOperation =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 06a4006..e4e9ceb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -462,7 +462,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * use file list in .carbonindex file to get the split of streaming.
    */
-  private List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
+  public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
       List<String> streamSegments) throws IOException {
     List<InputSplit> splits = new ArrayList<InputSplit>();
     if (streamSegments != null && !streamSegments.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
index 47b43c4..8497d2b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -57,7 +57,7 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
    * if the byte size of streaming segment reach this value,
    * the system will create a new stream segment
    */
-  public static final String HANDOFF_SIZE = "carbon.handoff.size";
+  public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size";
 
   /**
    * the min handoff size of streaming segment, the unit is byte

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index f7169a0..0086a3c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -134,6 +134,9 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
   // empty project, null filter
   private boolean skipScanData;
 
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
   @Override public void initialize(InputSplit split, TaskAttemptContext context)
       throws IOException, InterruptedException {
     // input
@@ -256,6 +259,10 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     return header.getSync_marker();
   }
 
+  public void setUseRawRow(boolean useRawRow) {
+    this.useRawRow = useRawRow;
+  }
+
   private void initializeAtFirstRow() throws IOException {
     filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
     filterRow = new RowImpl();
@@ -342,7 +349,12 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
             input.nextRow();
             scanMore = false;
           } else {
-            readRowFromStream();
+            if (useRawRow) {
+              // read raw row for streaming handoff which does not require decode raw row
+              readRawRowFromStream();
+            } else {
+              readRowFromStream();
+            }
             if (null != filter) {
               scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
             } else {
@@ -605,6 +617,61 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     }
   }
 
+  private void readRawRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          outputValues[colCount] = input.readBytes(v);
+        } else {
+          outputValues[colCount] = input.readInt();
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        short v = input.readShort();
+        outputValues[colCount] = input.readBytes(v);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          outputValues[colCount] = input.readBoolean();
+        } else if (dataType == DataTypes.SHORT) {
+          outputValues[colCount] = input.readShort();
+        } else if (dataType == DataTypes.INT) {
+          outputValues[colCount] = input.readInt();
+        } else if (dataType == DataTypes.LONG) {
+          outputValues[colCount] = input.readLong();
+        } else if (dataType == DataTypes.DOUBLE) {
+          outputValues[colCount] = input.readDouble();
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+        }
+      }
+    }
+  }
+
   private void putRowToColumnBatch(int rowId) {
     for (int i = 0; i < projection.length; i++) {
       Object value = outputValues[i];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index bbf242e..450fed0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -107,6 +107,15 @@ class MergeResultImpl extends MergeResult[String, Boolean] {
   override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
 }
 
+trait HandoffResult[K, V] extends Serializable {
+  def getKey(key: String, value: Boolean): (K, V)
+
+}
+
+class HandoffResultImpl extends HandoffResult[String, Boolean] {
+  override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
+}
+
 trait AlterPartitionResult[K, V] extends Serializable {
   def getKey(key: String, value: Boolean): (K, V)
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index db2037c..997838c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -116,7 +116,7 @@ class CarbonMergerRDD[K, V](
 
         // During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
         // one segment, so max cardinality will be calculated from first block of segment
-        if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+        if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
           var dataFileFooter: DataFileFooter = null
           try {
             // As the tableBlockInfoList is sorted take the ColCardinality from the last
@@ -137,15 +137,14 @@ class CarbonMergerRDD[K, V](
           carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
             .toList
         }
-        mergeNumber = if (carbonMergerMapping.campactionType ==
-                              CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+        mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
           tableBlockInfoList.get(0).getSegmentId
-        }
-        else {
-          mergedLoadName
-            .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-                       CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
-            )
+        } else {
+          mergedLoadName.substring(
+            mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+              CarbonCommonConstants.LOAD_FOLDER.length(),
+            mergedLoadName.length()
+          )
         }
         carbonLoadModel.setSegmentId(mergeNumber)
         val tempLocationKey = CarbonDataProcessorUtil
@@ -277,7 +276,7 @@ class CarbonMergerRDD[K, V](
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
     var taskPartitionNo = 0
-    var carbonPartitionId = 0;
+    var carbonPartitionId = 0
     var noOfBlocks = 0
 
     val taskInfoList = new java.util.ArrayList[Distributable]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc40f13..7eaf95a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -308,10 +308,11 @@ class CarbonScanRDD(
           val value = reader.getCurrentValue
           value
         }
+
         private def close() {
           TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
           inputMetricsStats.updateAndClose()
-      }
+        }
       }
     } else {
       new Iterator[Any] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 911dc9e..ba634b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -87,7 +87,7 @@ object Compactor {
     }
 
     val mergeStatus =
-    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       new CarbonIUDMergerRDD(
         sc.sparkContext,
         new MergeResultImpl(),
@@ -124,7 +124,7 @@ object Compactor {
       val endTime = System.nanoTime()
       logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
       val statusFileUpdation =
-        ((compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) &&
+        ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
           CarbonDataMergerUtil
             .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
               carbonTable.getMetaDataFilepath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 762dcee..26a66f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.{CompactionCallableModel, Compacti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -62,7 +61,7 @@ object DataManagementFunc {
       compactionModel.compactionType
     )
     while (loadsToMerge.size() > 1 ||
-           (compactionModel.compactionType.name().equals("IUD_UPDDEL_DELTA_COMPACTION") &&
+           (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
             loadsToMerge.size() > 0)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       deletePartialLoadsInCompaction(carbonLoadModel)
@@ -97,13 +96,13 @@ object DataManagementFunc {
       // in case of major compaction we will scan only once and come out as it will keep
       // on doing major for the new loads also.
       // excluding the newly added segments.
-      if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+      if (CompactionType.MAJOR == compactionModel.compactionType) {
 
         segList = CarbonDataMergerUtil
           .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
       }
 
-      if (compactionModel.compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
         loadsToMerge.clear()
       } else if (segList.size > 0) {
         loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
@@ -223,5 +222,4 @@ object DataManagementFunc {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 284587d..f384d28 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -83,29 +83,34 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactionModel: CompactionModel): Unit = {
     // taking system level lock at the mdt file location
-    var configuredMdtPath = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
-        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
+    var configuredMdtPath = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
+      CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
+
     configuredMdtPath = CarbonUtil.checkAndAppendFileSystemURIScheme(configuredMdtPath)
-    val lock = CarbonLockFactory
-      .getCarbonLockObj(configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
-                        CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
+    val lock = CarbonLockFactory.getCarbonLockObj(
+      configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
+        CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+      LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
+
     if (lock.lockWithRetries()) {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
           s".${ carbonLoadModel.getTableName }")
       try {
-        if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+        if (compactionType == CompactionType.SEGMENT_INDEX) {
           // Just launch job to merge index and return
-          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+          CommonUtil.mergeIndexFiles(
+            sqlContext.sparkContext,
             CarbonDataMergerUtil.getValidSegmentList(
               carbonTable.getAbsoluteTableIdentifier).asScala,
             carbonLoadModel.getTablePath,
-            carbonTable, true)
+            carbonTable,
+            true)
           lock.unlock()
           return
         }
-        startCompactionThreads(sqlContext,
+        startCompactionThreads(
+          sqlContext,
           carbonLoadModel,
           storeLocation,
           compactionModel,
@@ -148,7 +153,7 @@ object CarbonDataRDDFactory {
       compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
-    if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+    if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
       // update the updated table status. For the case of Update Delta Compaction the Metadata
       // is filled in LoadModel, no need to refresh.
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
@@ -197,8 +202,7 @@ object CarbonDataRDDFactory {
 
               val newCarbonLoadModel = prepareCarbonLoadModel(table)
 
-              val compactionSize = CarbonDataMergerUtil
-                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+              val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -231,11 +235,10 @@ object CarbonDataRDDFactory {
                 }
               }
               // ********* check again for all the tables.
-              tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
-                  .carbonMetastore.listAllTables(sqlContext.sparkSession).toArray,
-                  skipCompactionTables.asJava
-                )
+              tableForCompaction = CarbonCompactionUtil.getNextTableToCompact(
+                CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+                  .listAllTables(sqlContext.sparkSession).toArray,
+                skipCompactionTables.asJava)
             }
           }
           // giving the user his error for telling in the beeline if his triggered table
@@ -695,7 +698,7 @@ object CarbonDataRDDFactory {
       val compactionSize = 0
       val isCompactionTriggerByDDl = false
       val compactionModel = CompactionModel(compactionSize,
-        CompactionType.MINOR_COMPACTION,
+        CompactionType.MINOR,
         carbonTable,
         isCompactionTriggerByDDl
       )
@@ -718,7 +721,7 @@ object CarbonDataRDDFactory {
         handleCompactionForSystemLocking(sqlContext,
           carbonLoadModel,
           storeLocation,
-          CompactionType.MINOR_COMPACTION,
+          CompactionType.MINOR,
           carbonTable,
           compactionModel
         )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 2cd771c..cc398b5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.streaming.StreamHandoffRDD
 
 /**
  * Command for the compaction in alter table command
@@ -82,17 +83,16 @@ case class AlterTableCompactionCommand(
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTablePath(table.getTablePath)
 
-    var storeLocation = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-        System.getProperty("java.io.tmpdir")
-      )
+    var storeLocation = CarbonProperties.getInstance.getProperty(
+      CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+      System.getProperty("java.io.tmpdir"))
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
     try {
-      alterTableForCompaction(sparkSession.sqlContext,
-          alterTableModel,
-          carbonLoadModel,
-          storeLocation
-        )
+      alterTableForCompaction(
+        sparkSession.sqlContext,
+        alterTableModel,
+        carbonLoadModel,
+        storeLocation)
     } catch {
       case e: Exception =>
         if (null != e.getMessage) {
@@ -110,26 +110,16 @@ case class AlterTableCompactionCommand(
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
       storeLocation: String): Unit = {
-    val LOGGER: LogService =
-    LogServiceFactory.getLogService(this.getClass.getName)
-    var compactionSize: Long = 0
-    var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
-    if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
-      compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
-      compactionType = CompactionType.MAJOR_COMPACTION
-    } else if (alterTableModel.compactionType.equalsIgnoreCase(
-      CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
-      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
+    val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType)
+    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(
           alterTableModel.segmentUpdateStatusManager.get)
         carbonLoadModel.setLoadMetadataDetails(
           alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
       }
-    } else if (alterTableModel.compactionType.equalsIgnoreCase("segment_index")) {
-      compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
-    } else {
-      compactionType = CompactionType.MINOR_COMPACTION
     }
 
     LOGGER.audit(s"Compaction request received for table " +
@@ -139,6 +129,15 @@ case class AlterTableCompactionCommand(
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
+
+    if (compactionType == CompactionType.STREAMING) {
+      StreamHandoffRDD.startStreamingHandoffThread(
+        carbonLoadModel,
+        sqlContext,
+        storeLocation)
+      return
+    }
+
     // reading the start time of data load.
     val loadStartTime : Long =
       if (alterTableModel.factTimeStamp.isEmpty) {
@@ -166,7 +165,8 @@ case class AlterTableCompactionCommand(
     // so that this will be taken up by the compaction process which is executing.
     if (!isConcurrentCompactionAllowed) {
       LOGGER.info("System level compaction lock is enabled.")
-      CarbonDataRDDFactory.handleCompactionForSystemLocking(sqlContext,
+      CarbonDataRDDFactory.handleCompactionForSystemLocking(
+        sqlContext,
         carbonLoadModel,
         storeLocation,
         compactionType,
@@ -175,16 +175,15 @@ case class AlterTableCompactionCommand(
       )
     } else {
       // normal flow of compaction
-      val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-          LockUsage.COMPACTION_LOCK
-        )
+      val lock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
 
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
-          if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+          if (compactionType == CompactionType.SEGMENT_INDEX) {
             // Just launch job to merge index and return
             CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
               CarbonDataMergerUtil.getValidSegmentList(
@@ -194,7 +193,8 @@ case class AlterTableCompactionCommand(
             lock.unlock()
             return
           }
-          CarbonDataRDDFactory.startCompactionThreads(sqlContext,
+          CarbonDataRDDFactory.startCompactionThreads(
+            sqlContext,
             carbonLoadModel,
             storeLocation,
             compactionModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 6762489..a875ad6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -53,7 +53,7 @@ object HorizontalCompaction {
       return
     }
 
-    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
     val carbonTable = carbonRelation.carbonTable
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
@@ -78,7 +78,7 @@ object HorizontalCompaction {
     if (isUpdateOperation) {
 
       // This is only update operation, perform only update compaction.
-      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
       performUpdateDeltaCompaction(sparkSession,
         compactionTypeIUD,
         carbonTable,
@@ -89,7 +89,7 @@ object HorizontalCompaction {
     }
 
     // After Update Compaction perform delete compaction
-    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
+    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA
     segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
     if (segLists == null || segLists.size() == 0) {
       return
@@ -135,7 +135,7 @@ object HorizontalCompaction {
       val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
         carbonTable.getTableName,
         Some(segmentUpdateStatusManager),
-        CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+        CompactionType.IUD_UPDDEL_DELTA.toString,
         Some(factTimeStamp),
         "")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bf037d1..2418d2a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
+import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
@@ -83,9 +84,18 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           .tableExists(TableIdentifier(altertablemodel.tableName,
             altertablemodel.dbName))(sparkSession)
         if (isCarbonTable) {
-          if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
-              altertablemodel.compactionType.equalsIgnoreCase("major") ||
-              altertablemodel.compactionType.equalsIgnoreCase("segment_index")) {
+          var compactionType: CompactionType = null
+          try {
+            compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase)
+          } catch {
+            case _ =>
+              throw new MalformedCarbonCommandException(
+                "Unsupported alter operation on carbon table")
+          }
+          if (CompactionType.MINOR == compactionType ||
+              CompactionType.MAJOR == compactionType ||
+              CompactionType.SEGMENT_INDEX == compactionType ||
+              CompactionType.STREAMING == compactionType) {
             ExecutedCommandExec(alterTable) :: Nil
           } else {
             throw new MalformedCarbonCommandException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index d9591c4..3b9b2c3 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -109,6 +109,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
     // 12. reject alter streaming properties
     createTable(tableName = "stream_table_alter", streaming = true, withBatchLoad = false)
+
+    // 13. handoff streaming segment
+    createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -189,6 +192,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_tolerant")
     sql("drop table if exists streaming.stream_table_delete")
     sql("drop table if exists streaming.stream_table_alter")
+    sql("drop table if exists streaming.stream_table_handoff")
   }
 
   // normal table not support streaming ingest
@@ -251,7 +255,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     // streaming ingest 10 rows
     generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
     val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
-      identifier )
+      identifier)
     thread.start()
     Thread.sleep(2000)
     generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
@@ -327,7 +331,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
     checkAnswer(
       sql("select count(*) from streaming.stream_table_10s"),
-      Seq(Row(5 + 10000*5)))
+      Seq(Row(5 + 10000 * 5)))
   }
 
   // batch loading on streaming table
@@ -344,13 +348,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     )
     checkAnswer(
       sql("select count(*) from streaming.stream_table_batch"),
-      Seq(Row(100*5)))
+      Seq(Row(100 * 5)))
 
     executeBatchLoad("stream_table_batch")
 
     checkAnswer(
       sql("select count(*) from streaming.stream_table_batch"),
-      Seq(Row(100*5 + 5)))
+      Seq(Row(100 * 5 + 5)))
   }
 
   // detail query on batch and stream segment
@@ -666,6 +670,43 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("handoff 'streaming finish' segment to columnar segment") {
+    executeStreamingIngest(
+      tableName = "stream_table_handoff",
+      batchNums = 6,
+      rowNumsEachBatch = 10000,
+      intervalOfSource = 5,
+      intervalOfIngest = 10,
+      continueSeconds = 40,
+      generateBadRecords = false,
+      badRecordAction = "force",
+      handoffSize = 1024L * 200
+    )
+    val segments = sql("show segments for table streaming.stream_table_handoff").collect()
+    assertResult(3)(segments.length)
+    assertResult("Streaming")(segments(0).getString(1))
+    assertResult("Streaming Finish")(segments(1).getString(1))
+    assertResult("Streaming Finish")(segments(2).getString(1))
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_handoff"),
+      Seq(Row(6 * 10000))
+    )
+
+    sql("alter table streaming.stream_table_handoff compact 'streaming'")
+    Thread.sleep(10000)
+    val newSegments = sql("show segments for table streaming.stream_table_handoff").collect()
+    assertResult(5)(newSegments.length)
+    assertResult("Success")(newSegments(0).getString(1))
+    assertResult("Success")(newSegments(1).getString(1))
+    assertResult("Streaming")(newSegments(2).getString(1))
+    assertResult("Compacted")(newSegments(3).getString(1))
+    assertResult("Compacted")(newSegments(4).getString(1))
+    checkAnswer(
+      sql("select count(*) from streaming.stream_table_handoff"),
+      Seq(Row(6 * 10000))
+    )
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 6da50a9..c32aa51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -45,7 +45,7 @@ public abstract class AbstractResultProcessor {
       CompactionType compactionType, CarbonTable carbonTable,
       CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
-    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+    if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
       int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
           CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
               carbonTable.getCarbonTableIdentifier()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index c60bb24..d796262 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -171,17 +171,17 @@ public class CarbonCompactionUtil {
     try {
       if (FileFactory.isFileExist(minorCompactionStatusFile,
           FileFactory.getFileType(minorCompactionStatusFile))) {
-        return CompactionType.MINOR_COMPACTION;
+        return CompactionType.MINOR;
       }
       if (FileFactory.isFileExist(majorCompactionStatusFile,
           FileFactory.getFileType(majorCompactionStatusFile))) {
-        return CompactionType.MAJOR_COMPACTION;
+        return CompactionType.MAJOR;
       }
 
     } catch (IOException e) {
       LOGGER.error("Exception in determining the compaction request file " + e.getMessage());
     }
-    return CompactionType.MINOR_COMPACTION;
+    return CompactionType.MINOR;
   }
 
   /**
@@ -193,7 +193,7 @@ public class CarbonCompactionUtil {
   public static boolean deleteCompactionRequiredFile(String metaFolderPath,
       CompactionType compactionType) {
     String compactionRequiredFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+    if (compactionType.equals(CompactionType.MINOR)) {
       compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
           + CarbonCommonConstants.minorCompactionRequiredFile;
     } else {
@@ -229,7 +229,7 @@ public class CarbonCompactionUtil {
   public static boolean createCompactionRequiredFile(String metaFolderPath,
       CompactionType compactionType) {
     String statusFile;
-    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+    if (CompactionType.MINOR == compactionType) {
       statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
           + CarbonCommonConstants.minorCompactionRequiredFile;
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 15ee4fb..d6f2d9a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -332,7 +332,7 @@ public final class CarbonDataMergerUtil {
         loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
         loadMetadataDetails.setPartitionCount("0");
         // if this is a major compaction then set the segment as major compaction.
-        if (compactionType == CompactionType.MAJOR_COMPACTION) {
+        if (CompactionType.MAJOR == compactionType) {
           loadMetadataDetails.setMajorCompacted("true");
         }
 
@@ -394,7 +394,7 @@ public final class CarbonDataMergerUtil {
     sortSegments(sortedSegments);
 
     // Check for segments which are qualified for IUD compaction.
-    if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
 
       return identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
     }
@@ -410,7 +410,7 @@ public final class CarbonDataMergerUtil {
         identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
     List<LoadMetadataDetails> listOfSegmentsToBeMerged;
     // identify the segments to merge based on the Size of the segments across partition.
-    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
+    if (CompactionType.MAJOR == compactionType) {
 
       listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
           listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
@@ -574,7 +574,7 @@ public final class CarbonDataMergerUtil {
    * @param listOfSegmentsAfterPreserve  the segments list after
    *        preserving the configured number of latest loads
    * @param carbonLoadModel carbon load model
-   * @param storeLocation the store location of the segment
+   * @param tablePath the store location of the segment
    * @return the list of segments that need to be merged
    *         based on the Size in case of Major compaction
    */
@@ -641,7 +641,7 @@ public final class CarbonDataMergerUtil {
 
   /**
    * For calculating the size of the specified segment
-   * @param storePath the store path of the segment
+   * @param tablePath the store path of the segment
    * @param tableIdentifier identifier of table that the segment belong to
    * @param segId segment id
    * @return the data size of the segment
@@ -814,7 +814,7 @@ public final class CarbonDataMergerUtil {
 
     long compactionSize = 0;
     switch (compactionType) {
-      case MAJOR_COMPACTION:
+      case MAJOR:
         compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
         break;
       default: // this case can not come.
@@ -926,7 +926,7 @@ public final class CarbonDataMergerUtil {
 
     List<String> validSegments = new ArrayList<>();
 
-    if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
+    if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
       int numberDeleteDeltaFilesThreshold =
           CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
       List<String> deleteSegments = new ArrayList<>();
@@ -948,7 +948,7 @@ public final class CarbonDataMergerUtil {
           }
         }
       }
-    } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+    } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
       int numberUpdateDeltaFilesThreshold =
           CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
       for (String seg : Segments) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 187ba06..d115a7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -196,9 +196,17 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private void processResult(List<RawResultIterator> resultIteratorList) throws Exception {
     for (RawResultIterator resultIterator : resultIteratorList) {
-      while (resultIterator.hasNext()) {
-        addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
-        isRecordFound = true;
+      if (CompactionType.STREAMING == compactionType) {
+        while (resultIterator.hasNext()) {
+          // the input iterator of streaming segment is already using raw row
+          addRowForSorting(resultIterator.next());
+          isRecordFound = true;
+        }
+      } else {
+        while (resultIterator.hasNext()) {
+          addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
+          isRecordFound = true;
+        }
       }
     }
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 863257c..314afc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -23,10 +23,11 @@ package org.apache.carbondata.processing.merger;
  * called IUD compaction.
  */
 public enum CompactionType {
-    MINOR_COMPACTION,
-    MAJOR_COMPACTION,
-    IUD_UPDDEL_DELTA_COMPACTION,
-    IUD_DELETE_DELTA_COMPACTION,
-    SEGMENT_INDEX_COMPACTION,
+    MINOR,
+    MAJOR,
+    IUD_UPDDEL_DELTA,
+    IUD_DELETE_DELTA,
+    SEGMENT_INDEX,
+    STREAMING,
     NONE
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
new file mode 100644
index 0000000..f268883
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
+import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * partition of the handoff segment
+ */
+class HandoffPartition(
+    val rddId: Int,
+    val idx: Int,
+    @transient val inputSplit: CarbonInputSplit
+) extends Partition {
+
+  val split = new SerializableWritable[CarbonInputSplit](inputSplit)
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * package the record reader of the handoff segment to RawResultIterator
+ */
+class StreamingRawResultIterator(
+    recordReader: CarbonStreamRecordReader
+) extends RawResultIterator(null, null, null) {
+
+  override def hasNext: Boolean = {
+    recordReader.nextKeyValue()
+  }
+
+  override def next(): Array[Object] = {
+    recordReader
+      .getCurrentValue
+      .asInstanceOf[GenericInternalRow]
+      .values
+      .asInstanceOf[Array[Object]]
+  }
+}
+
+/**
+ * execute streaming segment handoff
+ */
+class StreamHandoffRDD[K, V](
+    sc: SparkContext,
+    result: HandoffResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    handOffSegmentId: String
+) extends CarbonRDD[(K, V)](sc, Nil) {
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  override def internalCompute(
+      split: Partition,
+      context: TaskContext
+  ): Iterator[(K, V)] = {
+    carbonLoadModel.setPartitionId("0")
+    carbonLoadModel.setTaskNo("" + split.index)
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // the input iterator is using raw row
+    val iteratorList = prepareInputIterator(split, carbonTable)
+    // use CompactionResultSortProcessor to sort data dan write to columnar files
+    val processor = prepareHandoffProcessor(carbonTable)
+    val status = processor.execute(iteratorList)
+
+    new Iterator[(K, V)] {
+      private var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey("" + split.index, status)
+      }
+    }
+  }
+
+  /**
+   * prepare input iterator by basing CarbonStreamRecordReader
+   */
+  private def prepareInputIterator(
+      split: Partition,
+      carbonTable: CarbonTable
+  ): util.ArrayList[RawResultIterator] = {
+    val inputSplit = split.asInstanceOf[HandoffPartition].split.value
+    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+    val hadoopConf = new Configuration()
+    CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
+    CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
+    CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
+    val projection = new CarbonProjection
+    val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
+    (0 until dataFields.size()).foreach { index =>
+      projection.addColumn(dataFields.get(index).getColName)
+    }
+    CarbonTableInputFormat.setColumnProjection(hadoopConf, projection)
+    val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+    val format = new CarbonTableInputFormat[Array[Object]]()
+    val model = format.getQueryModel(inputSplit, attemptContext)
+    val inputFormat = new CarbonStreamInputFormat
+    val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+      .asInstanceOf[CarbonStreamRecordReader]
+    streamReader.setVectorReader(false)
+    streamReader.setQueryModel(model)
+    streamReader.setUseRawRow(true)
+    streamReader.initialize(inputSplit, attemptContext)
+    val iteratorList = new util.ArrayList[RawResultIterator](1)
+    iteratorList.add(new StreamingRawResultIterator(streamReader))
+    iteratorList
+  }
+
+  private def prepareHandoffProcessor(
+      carbonTable: CarbonTable
+  ): CompactionResultSortProcessor = {
+    val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
+      carbonTable.getDimensionByTableName(carbonTable.getTableName),
+      carbonTable.getMeasureByTableName(carbonTable.getTableName))
+    val dimLensWithComplex =
+      (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray
+    val dictionaryColumnCardinality =
+      CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList)
+    val segmentProperties =
+      new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality)
+
+    new CompactionResultSortProcessor(
+      carbonLoadModel,
+      carbonTable,
+      segmentProperties,
+      CompactionType.STREAMING,
+      carbonTable.getTableName
+    )
+  }
+
+  /**
+   * get the partitions of the handoff segment
+   */
+  override protected def getPartitions: Array[Partition] = {
+    val job = Job.getInstance(FileFactory.getConfiguration)
+    val inputFormat = new CarbonTableInputFormat[Array[Object]]()
+    val segmentList = new util.ArrayList[String](1)
+    segmentList.add(handOffSegmentId)
+    val splits = inputFormat.getSplitsOfStreaming(
+      job,
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
+      segmentList
+    )
+
+    (0 until splits.size()).map { index =>
+      new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit])
+    }.toArray[Partition]
+  }
+}
+
+object StreamHandoffRDD {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * start new thread to execute stream segment handoff
+   */
+  def startStreamingHandoffThread(
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      storeLocation: String
+  ): Unit = {
+    // start a new thread to execute streaming segment handoff
+    val handoffThread = new Thread() {
+      override def run(): Unit = {
+        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+        val identifier = carbonTable.getAbsoluteTableIdentifier
+        val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
+        var continueHandoff = false
+        // require handoff lock on table
+        val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
+        try {
+          if (lock.lockWithRetries()) {
+            LOGGER.info("Acquired the handoff lock for table" +
+                        s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+            // handoff streaming segment one by one
+            do {
+              val segmentStatusManager = new SegmentStatusManager(identifier)
+              var loadMetadataDetails: Array[LoadMetadataDetails] = null
+              // lock table to read table status file
+              val statusLock = segmentStatusManager.getTableStatusLock
+              try {
+                if (statusLock.lockWithRetries()) {
+                  loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+                    tablePath.getMetadataDirectoryPath)
+                }
+              } finally {
+                if (null != statusLock) {
+                  statusLock.unlock()
+                }
+              }
+              if (null != loadMetadataDetails) {
+                val streamSegments =
+                  loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
+
+                continueHandoff = streamSegments.length > 0
+                if (continueHandoff) {
+                  // handoff a streaming segment
+                  val loadMetadataDetail = streamSegments(0)
+                  executeStreamingHandoff(
+                    carbonLoadModel,
+                    sqlContext,
+                    storeLocation,
+                    loadMetadataDetail.getLoadName
+                  )
+                }
+              } else {
+                continueHandoff = false
+              }
+            } while (continueHandoff)
+          }
+        } finally {
+          if (null != lock) {
+            lock.unlock()
+          }
+        }
+      }
+    }
+    handoffThread.start()
+  }
+
+  /**
+   * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment
+   */
+  def executeStreamingHandoff(
+      carbonLoadModel: CarbonLoadModel,
+      sqlContext: SQLContext,
+      storeLocation: String,
+      handoffSegmenId: String
+  ): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    var loadStatus = SegmentStatus.SUCCESS
+    var errorMessage: String = "Handoff failure"
+    try {
+      // generate new columnar segment
+      val newMetaEntry = new LoadMetadataDetails
+      carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+      CarbonLoaderUtil.populateNewLoadMetaEntry(
+        newMetaEntry,
+        SegmentStatus.INSERT_IN_PROGRESS,
+        carbonLoadModel.getFactTimeStamp,
+        false)
+      CarbonLoaderUtil.recordLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+      // convert a streaming segment to columnar segment
+      val status = new StreamHandoffRDD(
+        sqlContext.sparkContext,
+        new HandoffResultImpl(),
+        carbonLoadModel,
+        handoffSegmenId).collect()
+
+      status.foreach { x =>
+        if (!x._2) {
+          loadStatus = SegmentStatus.LOAD_FAILURE
+        }
+      }
+    } catch {
+      case ex =>
+        loadStatus = SegmentStatus.LOAD_FAILURE
+        errorMessage = errorMessage + ": " + ex.getCause.getMessage
+        LOGGER.error(errorMessage)
+        LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
+    }
+
+    if (loadStatus == SegmentStatus.LOAD_FAILURE) {
+      CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+      LOGGER.info("********starting clean up**********")
+      CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+      LOGGER.info("********clean up done**********")
+      LOGGER.audit(s"Handoff is failed for " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      LOGGER.warn("Cannot write load metadata file as handoff failed")
+      throw new Exception(errorMessage)
+    }
+
+    if (loadStatus == SegmentStatus.SUCCESS) {
+      val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+      if (!done) {
+        val errorMessage = "Handoff failed due to failure in table status updation."
+        LOGGER.audit("Handoff is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        LOGGER.error("Handoff failed due to failure in table status updation.")
+        throw new Exception(errorMessage)
+      }
+      done
+    }
+
+  }
+
+  /**
+   * update streaming segment and new columnar segment
+   */
+  private def updateLoadMetadata(
+      handoffSegmentId: String,
+      loadModel: CarbonLoadModel
+  ): Boolean = {
+    var status = false
+    val metaDataFilepath =
+      loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
+    val identifier =
+      loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
+    val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+    val fileType = FileFactory.getFileType(metadataPath)
+    if (!FileFactory.isFileExist(metadataPath, fileType)) {
+      FileFactory.mkdirs(metadataPath, fileType)
+    }
+    val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+    val segmentStatusManager = new SegmentStatusManager(identifier)
+    val carbonLock = segmentStatusManager.getTableStatusLock()
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+          "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+          + " for table status updation")
+        val listOfLoadFolderDetailsArray =
+          SegmentStatusManager.readLoadMetadata(metaDataFilepath)
+
+        // update new columnar segment to success status
+        val newSegment =
+          listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId))
+        if (newSegment.isEmpty) {
+          throw new Exception("Failed to update table status for new segment")
+        } else {
+          newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
+          newSegment.get.setLoadEndTime(System.currentTimeMillis())
+        }
+
+        // update streaming segment to compacted status
+        val streamSegment =
+          listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId))
+        if (streamSegment.isEmpty) {
+          throw new Exception("Failed to update table status for streaming segment")
+        } else {
+          streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
+        }
+
+        // refresh table status file
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray)
+        status = true
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+          .getDatabaseName() + "." + loadModel.getTableName());
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" +
+                    loadModel.getDatabaseName() + "." + loadModel.getTableName())
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() +
+                     "." + loadModel.getTableName() + " during table status updation")
+      }
+    }
+    return status
+  }
+}


Mime
View raw message