carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [11/14] carbondata git commit: [CARBONDATA-2022]Fixed table alias issue in PreAggregate
Date Sat, 13 Jan 2018 08:10:40 GMT
[CARBONDATA-2022]Fixed table alias issue in PreAggregate

**Problem:**Query with table alias is Not hitting pre Aggregate table.
Solution: Problem is table alias query is plan is coming as SubQueryAlias(alias, SubqueryAlias)
ans this case is not present in tranform query plan for pre aggregate

This closes #1794


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/913837fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/913837fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/913837fb

Branch: refs/heads/carbonstore
Commit: 913837fbcf6d51f21b4f1130158ea3281b7a89b3
Parents: 36b7982
Author: BJangir <babulaljangir111@gmail.com>
Authored: Thu Jan 11 15:30:59 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Jan 12 19:07:29 2018 +0530

----------------------------------------------------------------------
 .../TestPreAggregateTableSelection.scala        | 18 ++++-
 .../sql/hive/CarbonPreAggregateRules.scala      | 85 ++++++++++++++++++++
 2 files changed, 102 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/913837fb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 2e0dcc4..9bbba3a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -240,12 +240,28 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
     sql("CREATE TABLE grouptable(id int, name string, city string, age string) STORED BY"
+
         " 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='name,age')")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table grouptable")
-    sql("create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from
grouptable group by city")
+    sql(
+      "create datamap agg9 on table grouptable using 'preaggregate' as select sum(id) from
grouptable group by city")
     val df = sql("select sum(id) from grouptable group by city")
     preAggTableValidator(df.queryExecution.analyzed, "grouptable_agg9")
     checkAnswer(df, Seq(Row(3), Row(3), Row(4), Row(7)))
   }
 
+  test("test PreAggregate table selection 30") {
+    val df = sql("select a.name from mainTable a group by a.name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 31") {
+    val df = sql("select a.name as newName from mainTable a group by a.name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 32") {
+    val df = sql("select a.name as newName from mainTable a  where a.name='vishal' group
by a.name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists lineitem")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/913837fb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 8811a4e..de554c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -260,6 +260,48 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
    */
   def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
     val updatedPlan = logicalPlan.transform {
+      case agg@Aggregate(
+        grExp,
+        aggExp,
+        CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
+        val carbonTable = getCarbonTable(l)
+        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+        val isValidPlan = extractQueryColumnsFromAggExpression(
+          grExp,
+          aggExp,
+          carbonTable,
+          list,
+          aggregateExpressions)
+        if(isValidPlan) {
+          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+            aggregateExpressions,
+            carbonTable,
+            agg)
+          if(null != aggDataMapSchema && null!= childPlan) {
+            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+            val (updatedGroupExp, updatedAggExp, newChild, None) =
+              getUpdatedExpressions(grExp,
+                aggExp,
+                child,
+                None,
+                aggDataMapSchema,
+                attributes,
+                childPlan,
+                carbonTable,
+                agg)
+            Aggregate(updatedGroupExp,
+              updatedAggExp,
+              newChild)
+          } else {
+            agg
+          }
+        } else {
+          agg
+        }
       // case for aggregation query
       case agg@Aggregate(
         grExp,
@@ -352,6 +394,49 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         } else {
           agg
         }
+      case agg@Aggregate(
+      grExp,
+      aggExp,
+      Filter(expression, CarbonSubqueryAlias(_, child@CarbonSubqueryAlias(_, l: LogicalRelation))))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
+        val carbonTable = getCarbonTable(l)
+        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+        val isValidPlan = extractQueryColumnsFromAggExpression(
+          grExp,
+          aggExp,
+          carbonTable,
+          list,
+          aggregateExpressions)
+        if(isValidPlan) {
+          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+            aggregateExpressions,
+            carbonTable,
+            agg)
+          if(null != aggDataMapSchema && null!= childPlan) {
+            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+            val (updatedGroupExp, updatedAggExp, newChild, None) =
+              getUpdatedExpressions(grExp,
+                aggExp,
+                child,
+                None,
+                aggDataMapSchema,
+                attributes,
+                childPlan,
+                carbonTable,
+                agg)
+            Aggregate(updatedGroupExp,
+              updatedAggExp,
+              newChild)
+          } else {
+            agg
+          }
+        } else {
+          agg
+        }
+
     }
     updatedPlan
   }


Mime
View raw message