carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-1522] Support preaggregate table creation and loading on streaming tables
Date Fri, 30 Mar 2018 09:51:10 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 05eda0014 -> 7b56126b7


[CARBONDATA-1522] Support preaggregate table creation and loading on streaming tables

1. Added support to create preaggregate datamap on streaming table
2. Added support to load data into datamap after handoff is fired for streaming table
3. Added transaction support for datamap loading

This closes #2084


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7b56126b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7b56126b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7b56126b

Branch: refs/heads/master
Commit: 7b56126b726bfc47504513d3994a2849f6412016
Parents: 05eda00
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Tue Mar 20 11:41:17 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Mar 30 14:49:06 2018 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      |   1 -
 .../CarbonIndexFileMergeTestCase.scala          |   3 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  21 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   1 +
 .../datamap/CarbonCreateDataMapCommand.scala    |  24 +-
 .../CarbonAlterTableCompactionCommand.scala     |  15 +-
 .../preaaggregate/PreAggregateListeners.scala   |  77 +++---
 .../preaaggregate/PreAggregateTableHelper.scala |  12 +-
 .../TestStreamingTableOperation.scala           | 263 ++++++++++++++++++-
 9 files changed, 345 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index beb51cb..46afb36 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -151,7 +151,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T>
{
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
-
       List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments(),
           true);
       if (filteredSegmentToAccess.size() == 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 9cb30b9..99b536c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -183,12 +183,13 @@ class CarbonIndexFileMergeTestCase
     sql("create table mitable(id int, issue date) stored by 'carbondata'")
     sql("insert into table mitable select '1','2000-02-01'")
     val table = CarbonMetadata.getInstance().getCarbonTable("default", "mitable")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     sql("update mitable set(id)=(2) where issue = '2000-02-01'").show()
     sql("clean files for table mitable")
     sql("select * from mitable").show()
   }
+
   private def getIndexFileCount(tableName: String, segment: String): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 50102f1..f57d70c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.rdd
 
 import java.text.SimpleDateFormat
 import java.util
-import java.util.Date
+import java.util.{Date, UUID}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
@@ -218,8 +218,8 @@ object StreamHandoffRDD {
 
   def iterateStreamingHandoff(
       carbonLoadModel: CarbonLoadModel,
-      sparkSession: SparkSession
-  ): Unit = {
+      operationContext: OperationContext,
+      sparkSession: SparkSession): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = carbonTable.getAbsoluteTableIdentifier
     var continueHandoff = false
@@ -253,9 +253,11 @@ object StreamHandoffRDD {
             if (continueHandoff) {
               // handoff a streaming segment
               val loadMetadataDetail = streamSegments(0)
+              carbonLoadModel.setSegmentId(loadMetadataDetail.getLoadName)
               executeStreamingHandoff(
                 carbonLoadModel,
                 sparkSession,
+                operationContext,
                 loadMetadataDetail.getLoadName
               )
             }
@@ -276,16 +278,17 @@ object StreamHandoffRDD {
    */
   def startStreamingHandoffThread(
       carbonLoadModel: CarbonLoadModel,
+      operationContext: OperationContext,
       sparkSession: SparkSession,
       isDDL: Boolean
   ): Unit = {
     if (isDDL) {
-      iterateStreamingHandoff(carbonLoadModel, sparkSession)
+      iterateStreamingHandoff(carbonLoadModel, operationContext, sparkSession)
     } else {
       // start a new thread to execute streaming segment handoff
       val handoffThread = new Thread() {
         override def run(): Unit = {
-          iterateStreamingHandoff(carbonLoadModel, sparkSession)
+          iterateStreamingHandoff(carbonLoadModel, operationContext, sparkSession)
         }
       }
       handoffThread.start()
@@ -298,8 +301,8 @@ object StreamHandoffRDD {
   def executeStreamingHandoff(
       carbonLoadModel: CarbonLoadModel,
       sparkSession: SparkSession,
-      handoffSegmenId: String
-  ): Unit = {
+      operationContext: OperationContext,
+      handoffSegmenId: String): Unit = {
     var loadStatus = SegmentStatus.SUCCESS
     var errorMessage: String = "Handoff failure"
     try {
@@ -344,15 +347,13 @@ object StreamHandoffRDD {
     }
 
     if (loadStatus == SegmentStatus.SUCCESS) {
-      val operationContext = new OperationContext()
+      operationContext.setProperty("uuid", UUID.randomUUID().toString)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
         new LoadTablePreStatusUpdateEvent(
           carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
           carbonLoadModel)
       OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
       val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
-
       val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
         new LoadTablePostStatusUpdateEvent(carbonLoadModel)
       OperationListenerBus.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 402bc4b..e7ddebc 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -176,6 +176,7 @@ class CarbonAppendableStreamSink(
       if (enableAutoHandoff) {
         StreamHandoffRDD.startStreamingHandoffThread(
           carbonLoadModel,
+          new OperationContext,
           sparkSession,
           false)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 333dde6..91ff4d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapProvider
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.datamap.DataMapManager
 
@@ -49,17 +50,12 @@ case class CarbonCreateDataMapCommand(
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and pre-aggregate yet,
     // so streaming table does not support create datamap
-    mainTable =
-      tableIdentifier match {
-        case Some(table) =>
-          CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
-        case _ => null
-      }
-    if (mainTable != null && mainTable.isStreamingTable) {
-      throw new MalformedCarbonCommandException("Streaming table does not support creating
datamap")
+    mainTable = tableIdentifier match {
+      case Some(table) =>
+        CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+      case _ => null
     }
-
-    if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
+    if (mainTable.getDataMapSchema(dataMapName) != null) {
       if (!ifNotExistsSet) {
         throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already
exist")
       } else {
@@ -68,6 +64,14 @@ case class CarbonCreateDataMapCommand(
     }
 
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+    if (mainTable.isStreamingTable &&
+        !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)
+          || dataMapSchema.getProviderName
+            .equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString))) {
+      throw new MalformedCarbonCommandException(s"Streaming table does not support creating
${
+        dataMapSchema.getProviderName
+      } datamap")
+    }
     dataMapSchema.setProperties(new java.util.HashMap[String, String](
       dmProperties.map(x => (x._1.trim, x._2.trim)).asJava))
     dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema, sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e60a583..bccc4dd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -79,7 +79,15 @@ case class CarbonAlterTableCompactionCommand(
     }
     if (CarbonUtil.hasAggregationDataMap(table) ||
         (table.isChildDataMap && null == operationContext.getProperty(table.getTableName)))
{
-      val loadMetadataEvent = new LoadMetadataEvent(table, true)
+      // If the compaction request is of 'streaming' type then we need to generate loadCommands
+      // for all the child datamaps in the LoadMetadataEvent. Therefore setting isCompaction=false.
+      // If set to true then only loadCommands for compaction will be created.
+      val loadMetadataEvent =
+        if (alterTableModel.compactionType.equalsIgnoreCase(CompactionType.STREAMING.name()))
{
+          new LoadMetadataEvent(table, false)
+        } else {
+          new LoadMetadataEvent(table, true)
+        }
       OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
     }
     Seq.empty
@@ -176,6 +184,7 @@ case class CarbonAlterTableCompactionCommand(
     if (compactionType == CompactionType.STREAMING) {
       StreamHandoffRDD.startStreamingHandoffThread(
         carbonLoadModel,
+        operationContext,
         sqlContext.sparkSession, true)
       return
     }
@@ -183,6 +192,7 @@ case class CarbonAlterTableCompactionCommand(
     if (compactionType == CompactionType.CLOSE_STREAMING) {
       closeStreamingTable(
         carbonLoadModel,
+        operationContext,
         sqlContext.sparkSession)
       return
     }
@@ -263,6 +273,7 @@ case class CarbonAlterTableCompactionCommand(
 
   def closeStreamingTable(
       carbonLoadModel: CarbonLoadModel,
+      operationContext: OperationContext,
       sparkSession: SparkSession
   ): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -276,7 +287,7 @@ case class CarbonAlterTableCompactionCommand(
         // 2. convert segment status from "streaming" to "streaming finish"
         StreamSegment.finishStreaming(carbonTable)
         // 3. iterate to handoff all streaming segment to batch segment
-        StreamHandoffRDD.iterateStreamingHandoff(carbonLoadModel, sparkSession)
+        StreamHandoffRDD.iterateStreamingHandoff(carbonLoadModel, operationContext, sparkSession)
         val tableIdentifier =
           new TableIdentifier(carbonTable.getTableName, Option(carbonTable.getDatabaseName))
         // 4. modify table to normal table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index a516d1b..8740a2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusMan
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent,
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent,
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 
 /**
  * below class will be used to create load command for compaction
@@ -199,43 +199,49 @@ object LoadPostAggregateListener extends OperationEventListener {
    * @param event
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
+    val carbonLoadModelOption =
+      event match {
+        case e: LoadTablePreStatusUpdateEvent => Some(e.getCarbonLoadModel)
+        case e: LoadTablePostExecutionEvent => Some(e.getCarbonLoadModel)
+        case _ => None
+      }
     val sparkSession = SparkSession.getActiveSession.get
-    val carbonLoadModel = loadEvent.getCarbonLoadModel
-    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    if (CarbonUtil.hasAggregationDataMap(table)) {
-      // getting all the aggergate datamap schema
-      val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
-        .filter(_.isInstanceOf[AggregationDataMapSchema])
-        .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
-      // sorting the datamap for timeseries rollup
-      val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
-      for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
-        val childLoadCommand = operationContext
-          .getProperty(dataMapSchema.getChildSchema.getTableName)
-          .asInstanceOf[CarbonLoadDataCommand]
-        childLoadCommand.dataFrame = Some(PreAggregateUtil
-          .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
-        val isOverwrite =
-          operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
-        childLoadCommand.operationContext = operationContext
-        val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent")
-        val (parentTableIdentifier, segmentToLoad) =
-          if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) {
-            val (parentTableDatabase, parentTableName) =
-              (timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1))
-            (TableIdentifier(parentTableName, Some(parentTableDatabase)),
-            operationContext.getProperty(
-              s"${parentTableDatabase}_${parentTableName}_Segment").toString)
-        } else {
-            val currentSegmentFile = operationContext.getProperty("current.segmentfile")
-            val segment = if (currentSegmentFile != null) {
-              new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
+    if (carbonLoadModelOption.isDefined) {
+      val carbonLoadModel = carbonLoadModelOption.get
+      val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      if (CarbonUtil.hasAggregationDataMap(table)) {
+        // getting all the aggergate datamap schema
+        val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
+          .filter(_.isInstanceOf[AggregationDataMapSchema])
+          .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+        // sorting the datamap for timeseries rollup
+        val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
+        for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
+          val childLoadCommand = operationContext
+            .getProperty(dataMapSchema.getChildSchema.getTableName)
+            .asInstanceOf[CarbonLoadDataCommand]
+          childLoadCommand.dataFrame = Some(PreAggregateUtil
+            .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get))
+          val isOverwrite =
+            operationContext.getProperty("isOverwrite").asInstanceOf[Boolean]
+          childLoadCommand.operationContext = operationContext
+          val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent")
+          val (parentTableIdentifier, segmentToLoad) =
+            if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) {
+              val (parentTableDatabase, parentTableName) =
+                (timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1))
+              (TableIdentifier(parentTableName, Some(parentTableDatabase)),
+                operationContext.getProperty(
+                  s"${ parentTableDatabase }_${ parentTableName }_Segment").toString)
             } else {
-              Segment.toSegment(carbonLoadModel.getSegmentId)
+              val currentSegmentFile = operationContext.getProperty("current.segmentfile")
+              val segment = if (currentSegmentFile != null) {
+                new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
+              } else {
+                Segment.toSegment(carbonLoadModel.getSegmentId)
+              }
+              (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
             }
-            (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
-        }
 
         PreAggregateUtil.startDataLoadForDataMap(
         parentTableIdentifier,
@@ -247,6 +253,7 @@ object LoadPostAggregateListener extends OperationEventListener {
         }
       }
     }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index d89aa5b..71545e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 
 /**
  * Below helper class will be used to create pre-aggregate table
@@ -53,6 +54,8 @@ case class PreAggregateTableHelper(
 
   var loadCommand: CarbonLoadDataCommand = _
 
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def initMeta(sparkSession: SparkSession): Seq[Row] = {
     val dmProperties = dataMapProperties.asScala
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
@@ -176,9 +179,12 @@ case class PreAggregateTableHelper(
 
     if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
       throw new UnsupportedOperationException(
-        "Cannot create pre-aggregate table when insert is in progress on main table")
+        "Cannot create pre-aggregate table when insert is in progress on parent table")
     }
+    // check if any segment if available for load in the parent table
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+      .filter(segment => segment.getSegmentStatus == SegmentStatus.SUCCESS ||
+                         segment.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS)
     if (loadAvailable.nonEmpty) {
       // Passing segmentToLoad as * because we want to load all the segments into the
       // pre-aggregate table even if the user has set some segments on the parent table.
@@ -191,6 +197,8 @@ case class PreAggregateTableHelper(
         loadCommand,
         isOverwrite = false,
         sparkSession)
+    } else {
+      LOGGER.info(s"No segment available for load in table ${parentTable.getTableUniqueName}")
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b56126b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 3719be9..9761671 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -34,6 +34,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
@@ -111,6 +112,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     // 19. block streaming on 'preaggregate' main table
     createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false)
+
+    createTable(tableName = "agg_table", streaming = true, withBatchLoad = false)
+
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
   }
 
   test("validate streaming property") {
@@ -249,6 +256,243 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     assertResult(exceptedRow)(row)
   }
 
+  test("test preaggregate table creation on streaming table without handoff") {
+    val identifier = new TableIdentifier("agg_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table"),
+      Seq(Row(10)))
+    sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary)
from agg_table group by name")
+    // No data should be loaded into aggregate table as hand-off is not yet fired
+    checkAnswer(sql("select * from agg_table_p1"), Seq())
+  }
+
+  test("test if data is loaded into preaggregate after handoff is fired") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table2"),
+      Seq(Row(10)))
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary)
from agg_table2 group by name")
+    sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, avg(salary)
from agg_table2 group by name")
+    sql("create datamap p3 on table agg_table2 using 'preaggregate' as select name, min(salary)
from agg_table2 group by name")
+    sql("create datamap p4 on table agg_table2 using 'preaggregate' as select name, max(salary)
from agg_table2 group by name")
+    sql("create datamap p5 on table agg_table2 using 'preaggregate' as select name, count(salary)
from agg_table2 group by name")
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+    checkAnswer(sql("select * from agg_table2_p2"),
+      Seq(
+        Row("name_10", 200000.0, 2.0),
+        Row("name_11", 220000.0, 2.0),
+        Row("name_12", 240000.0, 2.0),
+        Row("name_13", 260000.0, 2.0),
+        Row("name_14", 280000.0, 2.0)))
+    checkAnswer(sql("select * from agg_table2_p3"),
+      Seq(
+        Row("name_10", 100000.0),
+        Row("name_11", 110000.0),
+        Row("name_12", 120000.0),
+        Row("name_13", 130000.0),
+        Row("name_14", 140000.0)))
+    checkAnswer(sql("select * from agg_table2_p4"),
+      Seq(
+        Row("name_10", 100000.0),
+        Row("name_11", 110000.0),
+        Row("name_12", 120000.0),
+        Row("name_13", 130000.0),
+        Row("name_14", 140000.0)))
+    checkAnswer(sql("select * from agg_table2_p5"),
+      Seq(
+        Row("name_10", 2.0),
+        Row("name_11", 2.0),
+        Row("name_12", 2.0),
+        Row("name_13", 2.0),
+        Row("name_14", 2.0)))
+    sql("drop table agg_table2")
+  }
+
+  test("test if timeseries load is successful when created on streaming table") {
+    sql("drop table if exists timeseries_table")
+    createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("timeseries_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE timeseries_table
+         | USING '${TIMESERIES.toString}'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='register',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT register, SUM(id) FROM timeseries_table
+         | GROUP BY register
+       """.stripMargin)
+    sql("alter table timeseries_table finish streaming")
+    sql("alter table timeseries_table compact 'streaming'")
+    checkAnswer( sql("select * FROM timeseries_table_agg0_second"), Seq(Row(Timestamp.valueOf("2010-01-01
10:01:01.0"), 120)))
+  }
+
+  test("test if timeseries load is successful when created on streaming table with day granularity")
{
+    sql("drop table if exists timeseries_table")
+    createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("timeseries_table", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE timeseries_table
+         | USING '${TIMESERIES.toString}'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='register',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT register, SUM(id) FROM timeseries_table
+         | GROUP BY register
+       """.stripMargin)
+    sql("alter table timeseries_table finish streaming")
+    sql("alter table timeseries_table compact 'streaming'")
+    checkAnswer( sql("select * FROM timeseries_table_agg0_day"), Seq(Row(Timestamp.valueOf("2010-01-01
00:00:00.0"), 120)))
+  }
+
+  test("test if minor compaction is successful for streaming and preaggregate tables") {
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary)
from agg_table2 group by name")
+    sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, min(salary)
from agg_table2 group by name")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    sql("alter table agg_table2 compact 'minor'")
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 800000.0),
+        Row("name_11", 880000.0),
+        Row("name_12", 960000.0),
+        Row("name_13", 1040000.0),
+        Row("name_14", 1120000.0)))
+    assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1"))
+    assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1"))
+    assert(sql("show segments for table agg_table2_p2").collect().map(_.get(0)).contains("0.1"))
+  }
+
+  test("test if major compaction is successful for streaming and preaggregate tables") {
+    sql("drop table if exists agg_table2")
+    createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false)
+    sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary)
from agg_table2 group by name")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    loadData()
+    sql("alter table agg_table2 finish streaming")
+    sql("alter table agg_table2 compact 'streaming'")
+    sql("alter table agg_table2 compact 'major'")
+    checkAnswer(sql("select * from agg_table2_p1"),
+      Seq(
+        Row("name_10", 800000.0),
+        Row("name_11", 880000.0),
+        Row("name_12", 960000.0),
+        Row("name_13", 1040000.0),
+        Row("name_14", 1120000.0)))
+    assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1"))
+    assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1"))
+  }
+
+  def loadData() {
+    val identifier = new TableIdentifier("agg_table2", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(2000)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    Thread.sleep(5000)
+    thread.interrupt()
+  }
+
+  test("test if data is loaded in aggregate table after handoff is done for streaming table")
{
+    createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false)
+    val identifier = new TableIdentifier("agg_table3", Option("streaming"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val csvDataDir = new File("target/csvdatanew").getCanonicalPath
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
+    generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append)
+    // streaming ingest 10 rows
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond
= 1,
+      identifier)
+    thread.start()
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming.agg_table3"),
+      Seq(Row(10)))
+    sql("alter table agg_table3 finish streaming")
+    sql("alter table agg_table3 compact 'streaming'")
+    sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary)
from agg_table3 group by name")
+    // Data should be loaded into aggregate table as hand-off is fired
+    checkAnswer(sql("select * from agg_table3_p1"),
+      Seq(
+        Row("name_10", 200000.0),
+        Row("name_11", 220000.0),
+        Row("name_12", 240000.0),
+        Row("name_13", 260000.0),
+        Row("name_14", 280000.0)))
+  }
+
   // bad records
   test("streaming table with bad records action: fail") {
     executeStreamingIngest(
@@ -921,8 +1165,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       generateBadRecords = false,
       badRecordAction = "force",
       handoffSize = 1,
-      autoHandoff = false
-    )
+      autoHandoff = false)
     val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect()
     sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before
" +
         s"'2999-10-01 01:00:00'")
@@ -1156,13 +1399,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
     }
   }
 
-  test("do not support creating datamap on streaming table") {
-    assert(
-      intercept[MalformedCarbonCommandException](
-        sql("CREATE DATAMAP datamap ON TABLE source " +
-            "USING 'preaggregate'" +
-            " AS SELECT c1, sum(c2) FROM source GROUP BY c1")
-      ).getMessage.contains("Streaming table does not support creating datamap"))
+  test("support creating datamap on streaming table") {
+    sql("CREATE DATAMAP datamap ON TABLE source " +
+        "USING 'preaggregate'" +
+        " AS SELECT c1, sum(c2) FROM source GROUP BY c1")
   }
 
   test("check streaming property of table") {
@@ -1338,7 +1578,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
       spark: SparkSession,
       idStart: Int,
       rowNums: Int,
-      csvDirPath: String): Unit = {
+      csvDirPath: String,
+      saveMode: SaveMode = SaveMode.Overwrite): Unit = {
     // Create csv data frame file
     val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
       .map { id =>
@@ -1358,7 +1599,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll
{
 
     csvDataDF.write
       .option("header", "false")
-      .mode(SaveMode.Overwrite)
+      .mode(saveMode)
       .csv(csvDirPath)
   }
 


Mime
View raw message