carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject carbondata git commit: [CARBONDATA-1743] fix conurrent pre-agg creation and query
Date Tue, 19 Dec 2017 13:59:22 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 804ddb76f -> e6497e14e


[CARBONDATA-1743] fix conurrent pre-agg creation and query

When a preaggregate table is created and load is in progress the aggregate query on main table
tries to get the data from pre-aggregate table which used to give empty results.If the size
of preaggregate table is 0 meaning no data is loaded then hit the main table instead of aggregate
table

This closes #1521


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6497e14
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6497e14
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6497e14

Branch: refs/heads/master
Commit: e6497e14e6454f0990354f542e0e7f2d888b9725
Parents: 804ddb7
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Fri Nov 17 12:13:25 2017 +0530
Committer: kumarvishal <kumarvishal.1802@gmail.com>
Committed: Tue Dec 19 19:29:03 2017 +0530

----------------------------------------------------------------------
 .../TestPreAggregateTableSelection.scala        | 17 ++++-----
 .../TestTimeseriesTableSelection.scala          | 32 +++++++++--------
 .../sql/hive/CarbonPreAggregateRules.scala      | 38 ++++++++++++--------
 .../apache/spark/sql/hive/CarbonRelation.scala  | 19 ++++++----
 4 files changed, 63 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 5b36826..7157b8a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -27,7 +27,14 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("drop table if exists lineitem")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from
mainTable group by name")
     sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age)
from mainTable group by name")
@@ -39,8 +46,6 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
     sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age)
from mainTable group by name")
     sql("create datamap agg8 on table maintable using 'preaggregate' as select name, sum(id),
avg(id) from maintable group by name")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
-    sql("create table if not exists lineitem(L_SHIPDATE string,L_SHIPMODE string,L_SHIPINSTRUCT
string,L_RETURNFLAG string,L_RECEIPTDATE string,L_ORDERKEY string,L_PARTKEY string,L_SUPPKEY
string,L_LINENUMBER int,L_QUANTITY double,L_EXTENDEDPRICE double,L_DISCOUNT double,L_TAX double,L_LINESTATUS
string,L_COMMITDATE string,L_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES
('table_blocksize'='128','NO_INVERTED_INDEX'='L_SHIPDATE,L_SHIPMODE,L_SHIPINSTRUCT,L_RETURNFLAG,L_RECEIPTDATE,L_ORDERKEY,L_PARTKEY,L_SUPPKEY','sort_columns'='')")
-    sql("create datamap agr_lineitem ON TABLE lineitem USING 'preaggregate' as select L_RETURNFLAG,L_LINESTATUS,sum
(L_QUANTITY),sum(L_EXTENDEDPRICE) from lineitem group by L_RETURNFLAG, L_LINESTATUS")
   }
 
   test("test sum and avg on same column should give proper results") {
@@ -138,11 +143,7 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
     val df = sql("select count(id) from mainTable")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
   }
-
-  test("test PreAggregate table selection 19") {
-    val df = sql("select L_RETURNFLAG,L_LINESTATUS,sum(L_QUANTITY),sum(L_EXTENDEDPRICE) from
lineitem group by L_RETURNFLAG, L_LINESTATUS")
-    preAggTableValidator(df.queryExecution.analyzed, "lineitem_agr_lineitem")
-  }
+  
   test("test PreAggregate table selection 20") {
     val df = sql("select name from mainTable group by name order by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index 0990f87..725ac24 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -28,71 +28,73 @@ class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll
{
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED
BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime',
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select dataTime,
sum(age) from mainTable group by dataTime")
+    sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime',
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime,
sum(age) from mainTable group by mytime")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
   }
+
   test("test PreAggregate table selection 1") {
-    val df = sql("select dataTime from mainTable group by dataTime")
+    val df = sql("select mytime from mainTable group by mytime")
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
   test("test PreAggregate table selection 2") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable group by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable group by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 3") {
-    val df = sql("select timeseries(dataTime,'milli') from mainTable group by timeseries(dataTime,'milli')")
+    val df = sql("select timeseries(mytime,'milli') from mainTable group by timeseries(mytime,'milli')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
   test("test PreAggregate table selection 4") {
-    val df = sql("select timeseries(dataTime,'year') from mainTable group by timeseries(dataTime,'year')")
+    val df = sql("select timeseries(mytime,'year') from mainTable group by timeseries(mytime,'year')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_year")
   }
 
   test("test PreAggregate table selection 5") {
-    val df = sql("select timeseries(dataTime,'day') from mainTable group by timeseries(dataTime,'day')")
+    val df = sql("select timeseries(mytime,'day') from mainTable group by timeseries(mytime,'day')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_day")
   }
 
   test("test PreAggregate table selection 6") {
-    val df = sql("select timeseries(dataTime,'month') from mainTable group by timeseries(dataTime,'month')")
+    val df = sql("select timeseries(mytime,'month') from mainTable group by timeseries(mytime,'month')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_month")
   }
 
   test("test PreAggregate table selection 7") {
-    val df = sql("select timeseries(dataTime,'minute') from mainTable group by timeseries(dataTime,'minute')")
+    val df = sql("select timeseries(mytime,'minute') from mainTable group by timeseries(mytime,'minute')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_minute")
   }
 
   test("test PreAggregate table selection 8") {
-    val df = sql("select timeseries(dataTime,'second') from mainTable group by timeseries(dataTime,'second')")
+    val df = sql("select timeseries(mytime,'second') from mainTable group by timeseries(mytime,'second')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_second")
   }
 
   test("test PreAggregate table selection 9") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x'
group by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable where timeseries(mytime,'hour')='x'
group by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 10") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x'
group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable where timeseries(mytime,'hour')='x'
group by timeseries(mytime,'hour') order by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 11") {
-    val df = sql("select timeseries(dataTime,'hour'),sum(age) from mainTable where timeseries(dataTime,'hour')='x'
group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour'),sum(age) from mainTable where timeseries(mytime,'hour')='x'
group by timeseries(mytime,'hour') order by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 12") {
-    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable
where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour')as hourlevel,sum(age) as sum from mainTable
where timeseries(mytime,'hour')='x' group by timeseries(mytime,'hour') order by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 13") {
-    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable
where timeseries(dataTime,'hour')='x' and name='vishal' group by timeseries(dataTime,'hour')
order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour')as hourlevel,sum(age) as sum from mainTable
where timeseries(mytime,'hour')='x' and name='vishal' group by timeseries(mytime,'hour') order
by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index c28cc44..76c39a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -296,21 +296,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
         val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
         // if it does not match with any pre aggregate table return the same plan
         if (!selectedDataMapSchemas.isEmpty) {
-          // sort the selected child schema based on size to select smallest pre aggregate
table
+          // filter the selected child schema based on size to select the pre-aggregate tables
+          // that are nonEmpty
           val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          val (aggDataMapSchema, carbonRelation, relation) =
-            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
-              val identifier = TableIdentifier(
-                selectedDataMapSchema.getRelationIdentifier.getTableName,
-                Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
-              val carbonRelation =
-                catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
-              val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
-              (selectedDataMapSchema, carbonRelation, relation)
-            }.minBy(f => f._2.sizeInBytes)
-          val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
-          // transform the query plan based on selected child schema
-          transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+          val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema
=>
+            val identifier = TableIdentifier(
+              selectedDataMapSchema.getRelationIdentifier.getTableName,
+              Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+            val carbonRelation =
+              catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+            val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
+            (selectedDataMapSchema, carbonRelation, relation)
+          }.filter(_._2.sizeInBytes != 0L)
+          if (relationBuffer.isEmpty) {
+            // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
+            // tables have date yet.
+            // In this case we would return the original plan so that the query hits the
parent
+            // table.
+            plan
+          } else {
+            // If the relationBuffer is nonEmpty then find the table with the minimum size.
+            val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
+            val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
+            // transform the query plan based on selected child schema
+            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+          }
         } else {
           plan
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index aadce98..87be2d2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -205,12 +206,18 @@ case class CarbonRelation(
       carbonTable.getAbsoluteTableIdentifier)
 
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getAbsoluteTableIdentifier).getPath
-      val fileType = FileFactory.getFileType(tablePath)
-      if(FileFactory.isFileExist(tablePath, fileType)) {
-        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
-        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+      if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+        .getValidAndInvalidSegments.getValidSegments.isEmpty) {
+        sizeInBytesLocalValue = 0L
+      } else {
+        val tablePath = CarbonStorePath.getCarbonTablePath(
+          carbonTable.getTablePath,
+          carbonTable.getCarbonTableIdentifier).getPath
+        val fileType = FileFactory.getFileType(tablePath)
+        if (FileFactory.isFileExist(tablePath, fileType)) {
+          tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
+          sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+        }
       }
     }
     sizeInBytesLocalValue


Mime
View raw message