carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1740][Pre-Aggregate] Fixed order by issue in case of preAggregate
Date Tue, 05 Dec 2017 02:54:06 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 77217b370 -> f70e6d700


[CARBONDATA-1740][Pre-Aggregate] Fixed order by issue in case of preAggregate

Problem: Order by query is failing in case of pre aggregate table.
Solution: In pre aggregate rules order by scenario is not handled. Handling the same in this
pr

This closes #1544


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f70e6d70
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f70e6d70
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f70e6d70

Branch: refs/heads/master
Commit: f70e6d7008cb5300581da4307ef55ec444999577
Parents: 77217b3
Author: kumarvishal <kumarvishal.1802@gmail.com>
Authored: Mon Nov 20 16:36:11 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Dec 5 08:23:48 2017 +0530

----------------------------------------------------------------------
 .../schema/table/AggregationDataMapSchema.java  |   3 +-
 .../carbondata/core/preagg/QueryColumn.java     |  24 ++
 .../TestPreAggregateTableSelection.scala        |  40 ++
 .../sql/hive/CarbonPreAggregateRules.scala      | 408 +++++++++++++++++--
 .../src/main/spark2.1/CarbonSessionState.scala  |  41 +-
 5 files changed, 465 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 87c07f4..9bfb22c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -93,7 +93,8 @@ public class AggregationDataMapSchema extends DataMapSchema {
     for (ColumnSchema columnSchema : listOfColumns) {
       List<ParentColumnTableRelation> parentColumnTableRelations =
           columnSchema.getParentColumnTableRelations();
-      if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
+      if (null != parentColumnTableRelations && parentColumnTableRelations.size()
== 1
+          && parentColumnTableRelations.get(0).getColumnName().equals(columName))
{
         return columnSchema;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
index a62d556..c889716 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -67,4 +67,28 @@ public class QueryColumn {
   public boolean isFilterColumn() {
     return isFilterColumn;
   }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    QueryColumn that = (QueryColumn) o;
+    if (isFilterColumn != that.isFilterColumn) {
+      return false;
+    }
+    if (!columnSchema.equals(that.columnSchema)) {
+      return false;
+    }
+    return aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction
== null;
+  }
+
+  @Override public int hashCode() {
+    int result = columnSchema.hashCode();
+    result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0);
+    result = 31 * result + (isFilterColumn ? 1 : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index c29beec..5dfe447 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -150,6 +150,46 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll
{
     val df = sql("select L_RETURNFLAG,L_LINESTATUS,sum(L_QUANTITY),sum(L_EXTENDEDPRICE) from
lineitem group by L_RETURNFLAG, L_LINESTATUS")
     preAggTableValidator(df.queryExecution.analyzed, "lineitem_agr_lineitem")
   }
+  test("test PreAggregate table selection 20") {
+    val df = sql("select name from mainTable group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 21") {
+    val df = sql("select name as NewName from mainTable group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 22") {
+    val df = sql("select name, sum(age) from mainTable group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 23") {
+    val df = sql("select name as NewName, sum(age) as sum from mainTable group by name order
by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 24") {
+    val df = sql("select name as NewName, sum(age) as sum from mainTable where name='vishal'
group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+  }
+
+  test("test PreAggregate table selection 25") {
+    val df = sql("select name as NewName, sum(age) as sum from mainTable where city = 'Bangalore'
group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable")
+  }
+
+  test("test PreAggregate table selection 26") {
+    val df = sql("select name from mainTable where name='vishal' group by name order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
+  test("test PreAggregate table selection 27") {
+    val df = sql("select name as NewName from mainTable where name='vishal' group by name
order by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+  }
+
 
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
     var isValidPlan = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 2875817..2b74ed7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast,
Divide, Expression, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast,
Divide, Expression, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.sql.CarbonExpressions.MatchCast
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -64,6 +65,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * 3. Filter Expression rules.
  *    3.1 Updated filter expression attributes with child table attributes
  * 4. Update the Parent Logical relation with child Logical relation
+ * 5. Order By Query rules.
+ *    5.1 Update project list based on updated aggregate expression
+ *    5.2 Update sort order attributes based on pre aggregate table
  *
  * @param sparkSession
  * spark session
@@ -95,7 +99,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
       plan
     } else {
       // create buffer to collect all the column and its metadata information
-      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
+      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
       var isValidPlan = true
       val carbonTable = plan match {
         // matching the plan based on supported plan
@@ -105,8 +109,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         // When plan has grouping expression, aggregate expression
         // subquery
         case Aggregate(groupingExp,
-        aggregateExp,
-        CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+          aggregateExp,
+          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
@@ -124,9 +128,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         // below case for handling filter query
         // When plan has grouping expression, aggregate expression
         // filter expression
-        case Aggregate(groupingExp, aggregateExp,
-        Filter(filterExp,
-        CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+        case Aggregate(groupingExp,
+          aggregateExp,
+          Filter(filterExp,
+          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
           // only carbon query plan is supported checking whether logical relation is
           // is for carbon
           if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
@@ -167,6 +172,104 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
             tableName,
             list)
           carbonTable
+        // case for handling aggregation, order by
+        case Project(projectList,
+          Sort(sortOrders,
+            _,
+            Aggregate(groupingExp,
+              aggregateExp,
+              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          if(isValidPlan) {
+            list ++
+            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
+          }
+          carbonTable
+        // case for handling aggregation, order by and filter
+        case Project(projectList,
+          Sort(sortOrders,
+            _,
+            Aggregate(groupingExp,
+              aggregateExp,
+              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          // TODO need to handle filter predicate subquery scenario
+//          isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+          if (isValidPlan) {
+            list ++
+            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
+            filterExp.transform {
+              case attr: AttributeReference =>
+                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn
= true)
+                attr
+            }
+          }
+          carbonTable
+        // case for handling aggregation with order by when only projection column exits
+        case Sort(sortOrders,
+          _,
+          Aggregate(groupingExp,
+            aggregateExp,
+            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          if(isValidPlan) {
+            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
+              carbonTable = carbonTable,
+              tableName = tableName)
+          }
+          carbonTable
+        // case for handling aggregation with order by and filter when only projection column
exits
+        case Sort(sortOrders,
+          _,
+          Aggregate(groupingExp,
+            aggregateExp,
+            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
+          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+               .hasDataMapSchema =>
+          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+            aggregateExp,
+            carbonTable,
+            tableName,
+            list)
+          // TODO need to handle filter predicate subquery scenario
+//          isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+          if(isValidPlan) {
+            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
+              carbonTable = carbonTable,
+              tableName = tableName)
+            filterExp.transform {
+              case attr: AttributeReference =>
+                list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn
= true)
+                attr
+            }
+          }
+          carbonTable
         case _ =>
           isValidPlan = false
           null
@@ -176,11 +279,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
         // getting all the projection columns
         val listProjectionColumn = list
           .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
+          .toList
         // getting all the filter columns
         val listFilterColumn = list
           .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
+          .toList
         // getting all the aggregation columns
         val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
+          .toList
         // create a query plan object which will be used to select the list of pre aggregate
tables
         // matches with this plan
         val queryPlan = new QueryPlan(listProjectionColumn.asJava,
@@ -190,7 +296,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
         // select the list of valid child tables
         val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
-        // if it doesnot match with any pre aggregate table return the same plan
+        // if it does not match with any pre aggregate table return the same plan
         if (!selectedDataMapSchemas.isEmpty) {
           // sort the selected child schema based on size to select smallest pre aggregate
table
           val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -216,6 +322,48 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
   }
 
   /**
+   * Below method will be used to extract columns from order by expression
+   * @param projectList
+   *                    project list from plan
+   * @param sortOrders
+   *                   sort order in plan
+   * @param carbonTable
+   *                    carbon table
+   * @param tableName
+   *                  table name
+   * @return query columns from expression
+   */
+  def extractQueryColumnForOrderBy(projectList: Option[Seq[NamedExpression]] = None,
+      sortOrders: Seq[SortOrder],
+      carbonTable: CarbonTable,
+      tableName: String): Seq[QueryColumn] = {
+    val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
+    if(projectList.isDefined) {
+      projectList.get.map {
+        proList =>
+          proList.transform {
+            case attr: AttributeReference =>
+              val queryColumn = getQueryColumn(attr.name, carbonTable, tableName)
+              if (null != queryColumn) {
+                list += queryColumn
+              }
+              attr
+          }
+      }
+    }
+    sortOrders.foreach { sortOrder =>
+        sortOrder.child match {
+          case attr: AttributeReference =>
+            val queryColumn = getQueryColumn(attr.name, carbonTable, tableName)
+            if (null != queryColumn) {
+              list += queryColumn
+            }
+        }
+    }
+    list
+  }
+
+  /**
    * Below method will be used to get the child attribute reference
    * based on parent name
    *
@@ -227,12 +375,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
    * child logical relation
    * @param aggFunction
    * aggregation function applied on child
+   * @param canBeNull
+   * this is added for strict validation in which case child attribute can be
+   * null and when it cannot be null
    * @return child attribute reference
    */
   def getChildAttributeReference(dataMapSchema: DataMapSchema,
       attributeReference: AttributeReference,
       attributes: Seq[AttributeReference],
-      aggFunction: String = ""): AttributeReference = {
+      aggFunction: String = "",
+      canBeNull: Boolean = false): AttributeReference = {
     val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
     val columnSchema = if (aggFunction.isEmpty) {
       aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
@@ -242,11 +394,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
     }
     // here column schema cannot be null, if it is null then aggregate table selection
     // logic has some problem
-    if (null == columnSchema) {
+    if (!canBeNull && null == columnSchema) {
       throw new AnalysisException("Column does not exists in Pre Aggregate table")
     }
-    // finding the child attribute from child logical relation
-    attributes.find(p => p.name.equals(columnSchema.getColumnName)).get
+    if(null == columnSchema && canBeNull) {
+      null
+    } else {
+      // finding the child attribute from child logical relation
+      attributes.find(p => p.name.equals(columnSchema.getColumnName)).get
+    }
   }
 
   /**
@@ -269,6 +425,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
    * 3. Filter Expression rules.
    *    3.1 Updated filter expression attributes with child table attributes
    * 4. Update the Parent Logical relation with child Logical relation
+   * 5. Order by plan rules.
+   *    5.1 Update project list based on updated aggregate expression
+   *    5.2 Update sort order attributes based on pre aggregate table
    *
    * @param logicalPlan
    * parent logical plan
@@ -283,6 +442,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
       childPlan: LogicalPlan): LogicalPlan = {
     val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
     logicalPlan.transform {
+      // case for aggregation query
       case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
@@ -297,9 +457,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         Aggregate(updatedGroupExp,
           updatedAggExp,
           newChild)
+        // case of handling aggregation query with filter
       case Aggregate(grExp,
-      aggExp,
-      Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
+        aggExp,
+        Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
         val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
@@ -314,6 +475,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
           updatedAggExp,
           Filter(updatedFilterExpression.get,
             newChild))
+        // case for aggregation query
       case Aggregate(grExp, aggExp, l: LogicalRelation)
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
@@ -328,10 +490,156 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
         Aggregate(updatedGroupExp,
           updatedAggExp,
           newChild)
+        // case for aggregation query with order by
+      case Project(_,
+        Sort(sortOrders,
+          global,
+          Aggregate(groupingExp,
+            aggregateExp,
+            subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+        val (updatedGroupExp, updatedAggExp, newChild, None) =
+          getUpdatedExpressions(groupingExp,
+            aggregateExp,
+            subQuery,
+            None,
+            aggDataMapSchema,
+            attributes,
+            childPlan)
+        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+          sortOrders,
+          aggDataMapSchema,
+          attributes)
+        Project(updatedProjectList,
+          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)))
+       // case for handling aggregation query with filter and order by
+      case Project(_,
+        Sort(sortOrders,
+          global,
+          Aggregate(groupingExp,
+            aggregateExp,
+            Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+          getUpdatedExpressions(groupingExp,
+            aggregateExp,
+            subQuery,
+            Some(expression),
+            aggDataMapSchema,
+            attributes,
+            childPlan)
+        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+          sortOrders,
+          aggDataMapSchema,
+          attributes)
+        Project(updatedProjectList,
+          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp,
+            Filter(updatedFilterExpression.get, newChild))))
+      // case for handling aggregation with order by when only projection column exits
+      case Sort(sortOrders,
+        global,
+        Aggregate(
+          groupingExp,
+          aggregateExp,
+          subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
+        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+             .hasDataMapSchema =>
+        val (updatedGroupExp, updatedAggExp, newChild, None) =
+          getUpdatedExpressions(groupingExp,
+            aggregateExp,
+            subQuery,
+            None,
+            aggDataMapSchema,
+            attributes,
+            childPlan)
+        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+          sortOrders,
+          aggDataMapSchema,
+          attributes)
+        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
+      // case for handling aggregation with order by and filter when only projection column
exits
+      case Sort(sortOrders,
+        global,
+        Aggregate(groupingExp,
+          aggregateExp,
+          Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema
=>
+        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+          getUpdatedExpressions(groupingExp,
+            aggregateExp,
+            subQuery,
+            Some(expression),
+            aggDataMapSchema,
+            attributes,
+            childPlan)
+        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
+          sortOrders,
+          aggDataMapSchema,
+          attributes)
+        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
     }
   }
 
   /**
+   * Below method will be used to updated the maintable plan for order by query
+   * In case of order by we need to update project list and sort order attributes.
+   *
+   * @param aggregateExp
+   *                     child table aggregate expression
+   * @param sortOrders
+   *                   sort order expression in maintable plan
+   * @param aggDataMapSchema
+   *                         child data map schema
+   * @param attributes
+   *                   child attributes
+   * @return updated project list and updated sort order
+   */
+  def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression],
+      sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema,
+      attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = {
+    val updatedProjectList = new ArrayBuffer[NamedExpression]()
+    // getting the updated project list from aggregate expression
+    aggregateExp.foreach{f => f.transform {
+      // for projection column
+      case alias@Alias(attr: AttributeReference, name) =>
+        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
+          alias.qualifier,
+          alias.isGenerated)
+        alias
+        // for aggregaton column
+      case alias@Alias(attr: AggregateExpression, name) =>
+        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
+          alias.qualifier,
+          alias.isGenerated)
+        alias
+    }
+    }
+    // getting the updated sort order
+    val updatedSortOrders = sortOrders.map { order =>
+      order.child match {
+        case attr: AttributeReference =>
+          val childAttribute = getChildAttributeReference(aggDataMapSchema,
+            attr,
+            attributes,
+            canBeNull = true)
+          // child attribute can be null only in case of alias in query
+          // so in that case we need to update the sortorder based on new alias
+          if (null != childAttribute) {
+            val childExpression = getUpdatedSortOrderExpression(childAttribute, aggregateExp)
+            SortOrder(childExpression, order.direction)
+          } else {
+            val childExpression = getUpdatedSortOrderExpression(attr, aggregateExp)
+            SortOrder(childExpression, order.direction)
+          }
+      }
+    }
+    (updatedProjectList, updatedSortOrders)
+  }
+  /**
    * Below method will be used to get the updated expression for pre aggregated table.
    * It will replace the attribute of actual plan with child table attributes.
    * Updation will be done for below expression.
@@ -352,6 +660,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
    * pre aggregate table schema
    * @param attributes
    * pre aggregate table logical relation
+   * @param aggPlan
+   *                aggregate logical plan
    * @return tuple of(updated grouping expression,
    *         updated aggregate expression,
    *         updated child logical plan,
@@ -363,7 +673,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
       aggDataMapSchema: DataMapSchema,
       attributes: Seq[AttributeReference],
       aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
-    Option[Expression]) = {
+      Option[Expression]) = {
     // transforming the group by expression attributes with child attributes
     val updatedGroupExp = groupingExpressions.map { exp =>
       exp.transform {
@@ -424,7 +734,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
         aggPlan match {
           case s: SubqueryAlias => s.child
           case others => others
-        }
+    }
     }
     // updating the filter expression if present
     val updatedFilterExpression = if (filterExpression.isDefined) {
@@ -440,6 +750,32 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
   }
 
   /**
+   * Below method will be used to get the updated sort order attribute
+   * based on pre aggregate table
+   * @param sortOrderAttr
+   *                      sort order attributes reference
+   * @param aggregateExpressions
+   *                             aggregate expression
+   * @return updated sortorder attribute
+   */
+  def getUpdatedSortOrderExpression(sortOrderAttr: AttributeReference,
+      aggregateExpressions: Seq[NamedExpression]): Expression = {
+    val updatedExpression = aggregateExpressions collectFirst {
+      // in case of alias we need to match with alias name and when alias is not present
+      // we need to compare with attribute reference name
+      case alias@Alias(attr: AttributeReference, name)
+        if attr.name.equals(sortOrderAttr.name) || name.equals(sortOrderAttr.name) =>
+          AttributeReference(name,
+            attr.dataType,
+            attr.nullable,
+            attr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
+    }
+    // any case it will match the condition, so no need to check whether updated expression
is empty
+    // or not
+    updatedExpression.get
+  }
+
+  /**
    * Below method will be used to get the aggregate expression based on match
    * Aggregate expression updation rules
    * 1 Change the count AggregateExpression to Sum as count
@@ -532,11 +868,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
           "sum")),
           aggExp.mode,
           isDistinct = false),
-          AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+          AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
             attr,
             attributes,
-            "count"),
-            LongType)),
+            "count")),
             aggExp.mode,
             isDistinct = false))
       // In case of average aggregate function select 2 columns from aggregate table
@@ -547,14 +882,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
           attr,
           attributes,
           "sum"),
-          changeDataType)),
+          DoubleType)),
           aggExp.mode,
           isDistinct = false),
           AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
             attr,
             attributes,
             "count"),
-            LongType)),
+            DoubleType)),
             aggExp.mode,
             isDistinct = false))
     }
@@ -587,21 +922,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
    * parent carbon table
    * @param tableName
    * parent table name
-   * @param list
+   * @param set
    * list of attributes
    * @return plan is valid
    */
   def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression],
       aggregateExpressions: Seq[NamedExpression],
       carbonTable: CarbonTable, tableName: String,
-      list: scala.collection.mutable.ListBuffer[QueryColumn]): Boolean = {
+      set: scala.collection.mutable.HashSet[QueryColumn]): Boolean = {
     aggregateExpressions.map {
       case attr: AttributeReference =>
-        list += getQueryColumn(attr.name,
+        set += getQueryColumn(attr.name,
           carbonTable,
           tableName);
       case Alias(attr: AttributeReference, _) =>
-        list += getQueryColumn(attr.name,
+        set += getQueryColumn(attr.name,
           carbonTable,
           tableName);
       case Alias(attr: AggregateExpression, _) =>
@@ -612,7 +947,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
           attr.aggregateFunction,
           tableName)
         if (queryColumn.nonEmpty) {
-          list ++= queryColumn
+          set ++= queryColumn
         } else {
           return false
         }
@@ -741,15 +1076,20 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession)
extends Rule
       dataType: String = "",
       isChangedDataType: Boolean = false,
       isFilterColumn: Boolean = false): QueryColumn = {
-    val columnSchema = carbonTable.getColumnByName(tableName,
-      columnName.toLowerCase).getColumnSchema
-    if (isChangedDataType) {
-      new QueryColumn(columnSchema, columnSchema.getDataType.getName,
-        aggFunction.toLowerCase, isFilterColumn)
+    val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
+    if(null == columnSchema) {
+      null
     } else {
-      new QueryColumn(columnSchema,
+      if (isChangedDataType) {
+        new QueryColumn(columnSchema.getColumnSchema,
+          columnSchema.getDataType.getName,
+          aggFunction.toLowerCase,
+          isFilterColumn)
+      } else {
+        new QueryColumn(columnSchema.getColumnSchema,
         CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
         aggFunction.toLowerCase, isFilterColumn)
+      }
     }
   }
 }
@@ -867,7 +1207,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends
Rule[Logi
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
         } else if (version.startsWith("2.2")) {
           CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
-        } else {
+      } else {
           throw new UnsupportedOperationException(s"Spark version $version is not supported")
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f70e6d70/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 911c25d..c54f3fe 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -20,7 +20,7 @@ import java.lang.reflect.Constructor
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods,
SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager,
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
@@ -155,7 +155,6 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     catalog.ParquetConversions ::
     catalog.OrcConversions ::
     CarbonPreInsertionCasts(sparkSession) ::
-    CarbonPreAggregateQueryRules(sparkSession) ::
     CarbonPreAggregateDataLoadingRules ::
     CarbonIUDAnalysisRule(sparkSession) ::
     AnalyzeCreateTable(sparkSession) ::
@@ -163,22 +162,22 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     DataSourceAnalysis(conf) ::
     (if (conf.runSQLonFile) {
       new ResolveDataSource(sparkSession) :: Nil
-    } else {  Nil }
-      )
+    } else {  Nil })
   }
 
-  override lazy val analyzer: Analyzer = {
-    new Analyzer(catalog, conf) {
-      override val extendedResolutionRules =
-        if (extendedAnalyzerRules.nonEmpty) {
-          extendedAnalyzerRules ++ internalAnalyzerRules
-        } else {
-          internalAnalyzerRules
-        }
-      override val extendedCheckRules = Seq(
-        PreWriteCheck(conf, catalog))
-    }
-  }
+  override lazy val analyzer: Analyzer =
+    new CarbonAnalyzer(catalog, conf, sparkSession,
+      new Analyzer(catalog, conf) {
+        override val extendedResolutionRules =
+          if (extendedAnalyzerRules.nonEmpty) {
+            extendedAnalyzerRules ++ internalAnalyzerRules
+          } else {
+            internalAnalyzerRules
+          }
+        override val extendedCheckRules = Seq(
+          PreWriteCheck(conf, catalog))
+      }
+  )
 
   /**
    * Internal catalog for managing table and database states.
@@ -195,6 +194,16 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
   }
 }
 
+class CarbonAnalyzer(catalog: SessionCatalog,
+    conf: CatalystConf,
+    sparkSession: SparkSession,
+    analyzer: Analyzer) extends Analyzer(catalog, conf) {
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    val logicalPlan = analyzer.execute(plan)
+    CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+  }
+}
+
 class CarbonOptimizer(
     catalog: SessionCatalog,
     conf: SQLConf,


Mime
View raw message