carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2069] Restrict create datamap when load is in progress
Date Sun, 28 Jan 2018 07:31:19 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master e40963254 -> 2f4dbb694


[CARBONDATA-2069] Restrict create datamap when load is in progress

Problem:
1. Load data into maintable
2. create datamap parallelly
preaggregate table will not have any data while data load is successful for main table. This
will make the pre-aggregate table inconsistent

Solution: Restrict creation of pre-aggregate table when load is in progress on main table

This closes #1850


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f4dbb69
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f4dbb69
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f4dbb69

Branch: refs/heads/master
Commit: 2f4dbb694c512aaaaa56ff1f0ed576dc50ac9f9d
Parents: e409632
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Tue Jan 23 18:52:48 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Sun Jan 28 13:01:03 2018 +0530

----------------------------------------------------------------------
 .../CreatePreAggregateTableCommand.scala        | 25 ++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f4dbb69/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c5340c2..a75a06f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,8 +30,10 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -180,9 +182,24 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. If not then
no
     // need to fire load for pre-aggregate table. Therefore reading the load details for
PARENT
     // table.
+    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
-      .nonEmpty
-    if (loadAvailable) {
+    if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS
||
+      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+      throw new UnsupportedOperationException(
+        "Cannot create pre-aggregate table when insert is in progress on main table")
+    } else if (loadAvailable.nonEmpty) {
+      val updatedQuery = if (timeSeriesFunction.isDefined) {
+        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+          .filter(p => p.getDataMapName
+            .equalsIgnoreCase(dataMapName)).head
+          .asInstanceOf[AggregationDataMapSchema]
+        PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+          parentTable.getTableName,
+          parentTable.getDatabaseName)
+      } else {
+        queryString
+      }
       // Passing segmentToLoad as * because we want to load all the segments into the
       // pre-aggregate table even if the user has set some segments on the parent table.
       loadCommand.dataFrame = Some(PreAggregateUtil


Mime
View raw message