carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1926][CARBONDATA-1927][Pre-Aggregate] Expression support inside aggregate function for Query
Date Mon, 08 Jan 2018 06:22:57 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master af4277e94 -> c70e73f11


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 1c93617..bd8b4c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -18,29 +18,46 @@
 package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
 
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
-import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
-import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.preagg.{AggregateQueryPlan, AggregateTableSelector, QueryColumn}
 import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
+ * model class to store aggregate expression logical plan
+ * and its column schema mapping
+ * @param expression aggregate expression
+ * @param columnSchema list of column schema from table
+ */
+case class AggExpToColumnMappingModel(
+    expression: Expression,
+    var columnSchema: Option[Object] = None) {
+  override def equals(o: Any) : Boolean = o match {
+    case that: AggExpToColumnMappingModel =>
+      that.expression==this.expression
+    case _ => false
+  }
+  // TODO need to update the hash code generation code
+  override def hashCode : Int = 1
+}
+/**
  * Class for applying Pre Aggregate rules
  * Responsibility.
  * 1. Check plan is valid plan for updating the parent table plan with child table
@@ -73,11 +90,36 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  *    6.1 validate maintable has timeseries datamap
  *    6.2 timeseries function is valid function or not
  *
- * @param sparkSession
- * spark session
+ * @param sparkSession spark session
  */
 case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
 
+  /**
+   * map for keeping parent attribute reference to child attribute reference
+   * this will be used to updated the plan in case of join or order by
+   */
+  val updatedExpression = mutable.HashMap[AttributeReference, AttributeReference]()
+
+  /**
+   * parser
+   */
+  lazy val parser = new CarbonSpark2SqlParser
+
+  /**
+   * Below method will be used to validate the logical plan
+   * @param logicalPlan query logical plan
+   * @return isvalid or not
+   */
+  private def isValidPlan(logicalPlan: LogicalPlan) : Boolean = {
+    var isValidPlan = true
+    logicalPlan.transform {
+      case aggregate@Aggregate(grp, aExp, child) =>
+        isValidPlan = !aExp.exists(p => p.name.equals("preAggLoad") || p.name.equals("preAgg"))
+        val updatedAggExp = aExp.filterNot(_.name.equalsIgnoreCase("preAggLoad"))
+        Aggregate(grp, updatedAggExp, child)
+    }
+    isValidPlan
+  }
   override def apply(plan: LogicalPlan): LogicalPlan = {
     var needAnalysis = true
     plan.transformExpressions {
@@ -98,244 +140,403 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         needAnalysis = false
         attr
     }
+    if(needAnalysis) {
+      needAnalysis = isValidPlan(plan)
+    }
     // if plan is not valid for transformation then return same plan
     if (!needAnalysis) {
       plan
     } else {
-      // create buffer to collect all the column and its metadata information
-      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
-      var isValidPlan = true
-      val carbonTable = plan match {
-        // matching the plan based on supported plan
-        // if plan is matches with any case it will validate and get all
-        // information required for transforming the plan
+      val updatedPlan = transformPreAggQueryPlan(plan)
+      val newPlan = updatePlan(updatedPlan)
+      newPlan
+    }
+  }
 
-        // When plan has grouping expression, aggregate expression
-        // subquery
-        case Aggregate(groupingExp,
-          aggregateExp,
-          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
-          // only carbon query plan is supported checking whether logical relation is
-          // is for carbon
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          // if it is valid plan then extract the query columns
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
-            carbonTable,
-            tableName,
-            list)
-          carbonTable
+  /**
+   * Below method will be used to update the child plan
+   * This will be used for updating expression like join condition,
+   * order by, project list etc
+   * @param plan child plan
+   * @return updated plan
+   */
+  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
+    val updatedPlan = plan transform {
+      case Aggregate(grp, aggExp, child) =>
+        Aggregate(
+          updateExpression(grp),
+          updateExpression(aggExp.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]],
+          child)
+      case Filter(filterExp, child) =>
+        Filter(updateExpression(Seq(filterExp)).head, child)
+      case Project(pList, child) =>
+        Project(
+          updateExpression(pList.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]],
+          child)
+      case Sort(sortOrders, global, child) =>
+        Sort(updateSortExpression(sortOrders), global, child)
+      case Join(left, right, joinType, condition) =>
+        val updatedCondition = condition match {
+          case Some(expr) => Some(updateExpression(Seq(expr)).head)
+          case _ => condition
+        }
+        Join(left, right, joinType, updatedCondition)
+    }
+    updatedPlan
+  }
 
-        // below case for handling filter query
-        // When plan has grouping expression, aggregate expression
-        // filter expression
-        case Aggregate(groupingExp,
-          aggregateExp,
-          Filter(filterExp,
-          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
-          // only carbon query plan is supported checking whether logical relation is
-          // is for carbon
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          // if it is valid plan then extract the query columns
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
-            carbonTable,
-            tableName,
-            list)
-          if(isValidPlan) {
-            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
-          }
-          // getting the columns from filter expression
-          if(isValidPlan) {
-            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+  /**
+   * Below method will be used to update the sort expression
+   * @param sortExp sort order expression in query
+   * @return updated sort expression
+   */
+  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
+     sortExp map { order =>
+      SortOrder(order.child transform  {
+        case attr: AttributeReference =>
+          updatedExpression.find { p => p._1.sameRef(attr) } match {
+            case Some((_, childAttr)) =>
+              AttributeReference(
+                childAttr.name,
+                childAttr.dataType,
+                childAttr.nullable,
+                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+            case None =>
+              attr
           }
-          carbonTable
+      }, order.direction )
+    }
+  }
 
-        // When plan has grouping expression, aggregate expression
-        // logical relation
-        case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
-          // only carbon query plan is supported checking whether logical relation is
-          // is for carbon
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          // if it is valid plan then extract the query columns
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
-            carbonTable,
-            tableName,
-            list)
-          carbonTable
-        // case for handling aggregation, order by
-        case Project(projectList,
-          Sort(sortOrders,
-            _,
-            Aggregate(groupingExp,
-              aggregateExp,
-              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
-            carbonTable,
-            tableName,
-            list)
-          if(isValidPlan) {
-            list ++
-            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
-          }
-          carbonTable
-        // case for handling aggregation, order by and filter
-        case Project(projectList,
-          Sort(sortOrders,
-            _,
-            Aggregate(groupingExp,
-              aggregateExp,
-              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))))
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
-            carbonTable,
-            tableName,
-            list)
-          if(isValidPlan) {
-            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
-          }
-          if (isValidPlan) {
-            list ++
-            extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
-            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
+  /**
+   * Below method will be used to update the expression like group by expression
+   * @param expressions sequence of expression like group by
+   * @return updated expressions
+   */
+  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
+    expressions map { expression =>
+      expression transform {
+        case attr: AttributeReference =>
+          updatedExpression.find { p => p._1.sameRef(attr) } match {
+            case Some((_, childAttr)) =>
+              AttributeReference(
+                childAttr.name,
+                childAttr.dataType,
+                childAttr.nullable,
+                childAttr.metadata)(childAttr.exprId, attr.qualifier, attr.isGenerated)
+            case None =>
+              attr
           }
-          carbonTable
-        // case for handling aggregation with order by when only projection column exits
-        case Sort(sortOrders,
-          _,
-          Aggregate(groupingExp,
-            aggregateExp,
-            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
+      }
+    }
+  }
+
+  /**
+   * Below method will be used to validate and transform the main table plan to child table plan
+   * rules for transforming is as below.
+   * 1. Grouping expression rules
+   *    1.1 Change the parent attribute reference for of group expression
+   * to child attribute reference
+   *
+   * 2. Aggregate expression rules
+   *    2.1 Change the parent attribute reference for of group expression to
+   * child attribute reference
+   *    2.2 Change the count AggregateExpression to Sum as count
+   * is already calculated so in case of aggregate table
+   * we need to apply sum to get the count
+   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
+   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+   * Note: During aggregate table creation for average table will be created with two columns
+   * one for sum(column) and count(column) to support rollup
+   * 3. Filter Expression rules.
+   *    3.1 Updated filter expression attributes with child table attributes
+   * 4. Update the Parent Logical relation with child Logical relation
+   * 5. timeseries function
+   *    5.1 validate parent table has timeseries datamap
+   *    5.2 timeseries function is valid function or not
+   *
+   * @param logicalPlan parent logical plan
+   * @return transformed plan
+   */
+  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+    val updatedPlan = logicalPlan.transform {
+      // case for aggregation query
+      case agg@Aggregate(
+        grExp,
+        aggExp,
+        child@CarbonSubqueryAlias(_, l: LogicalRelation))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
+        val carbonTable = getCarbonTable(l)
+        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+        val isValidPlan = extractQueryColumnsFromAggExpression(
+          grExp,
+          aggExp,
+          carbonTable,
+          list,
+          aggregateExpressions)
+        if(isValidPlan) {
+          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+            aggregateExpressions,
             carbonTable,
-            tableName,
-            list)
-          if(isValidPlan) {
-            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
-              carbonTable = carbonTable,
-              tableName = tableName)
+            agg)
+          if(null != aggDataMapSchema && null!= childPlan) {
+            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+            val (updatedGroupExp, updatedAggExp, newChild, None) =
+              getUpdatedExpressions(grExp,
+                aggExp,
+                child,
+                None,
+                aggDataMapSchema,
+                attributes,
+                childPlan,
+                carbonTable,
+                agg)
+            Aggregate(updatedGroupExp,
+              updatedAggExp,
+              newChild)
+          } else {
+            agg
           }
-          carbonTable
-        // case for handling aggregation with order by and filter when only projection column exits
-        case Sort(sortOrders,
-          _,
-          Aggregate(groupingExp,
-            aggregateExp,
-            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
-          if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-             logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-               metaData.hasAggregateDataMapSchema =>
-          val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
-          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
-            aggregateExp,
+        } else {
+          agg
+        }
+      // case of handling aggregation query with filter
+      case agg@Aggregate(
+        grExp,
+        aggExp,
+        Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
+             metaData.hasAggregateDataMapSchema =>
+        val carbonTable = getCarbonTable(l)
+        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
+        val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
+        var isValidPlan = extractQueryColumnsFromAggExpression(
+          grExp,
+          aggExp,
+          carbonTable,
+          list,
+          aggregateExpressions)
+        // getting the columns from filter expression
+        isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
+        if (isValidPlan) {
+          extractColumnFromExpression(expression, list, carbonTable, true)
+        }
+        if(isValidPlan) {
+          val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
+            aggregateExpressions,
             carbonTable,
-            tableName,
-            list)
-          if(isValidPlan) {
-            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
-          }
-          if(isValidPlan) {
-            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
-              carbonTable = carbonTable,
-              tableName = tableName)
-            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
-          }
-          carbonTable
-        case _ =>
-          isValidPlan = false
-          null
-      }
-      if (isValidPlan && null != carbonTable) {
-        isValidPlan = isSpecificSegmentPresent(carbonTable)
-      }
-      // if plan is valid then update the plan with child attributes
-      if (isValidPlan) {
-        // getting all the projection columns
-        val listProjectionColumn = list
-          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
-          .toList
-        // getting all the filter columns
-        val listFilterColumn = list
-          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
-          .toList
-        // getting all the aggregation columns
-        val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
-          .toList
-        // create a query plan object which will be used to select the list of pre aggregate tables
-        // matches with this plan
-        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
-          listAggregationColumn.asJava,
-          listFilterColumn.asJava)
-        // create aggregate table selector object
-        val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
-        // select the list of valid child tables
-        val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
-        // if it does not match with any pre aggregate table return the same plan
-        if (!selectedDataMapSchemas.isEmpty) {
-          // filter the selected child schema based on size to select the pre-aggregate tables
-          // that are nonEmpty
-          val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          val relationBuffer = selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
-            val identifier = TableIdentifier(
-              selectedDataMapSchema.getRelationIdentifier.getTableName,
-              Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
-            val carbonRelation =
-              catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
-            val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
-            (selectedDataMapSchema, carbonRelation, relation)
-          }.filter(_._2.sizeInBytes != 0L)
-          if (relationBuffer.isEmpty) {
-            // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
-            // tables have date yet.
-            // In this case we would return the original plan so that the query hits the parent
-            // table.
-            plan
+            agg)
+          if(null != aggDataMapSchema && null!= childPlan) {
+            val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
+            val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+              getUpdatedExpressions(grExp,
+                aggExp,
+                child,
+                Some(expression),
+                aggDataMapSchema,
+                attributes,
+                childPlan,
+                carbonTable,
+                agg)
+            Aggregate(updatedGroupExp,
+              updatedAggExp,
+              Filter(updatedFilterExpression.get,
+                newChild))
           } else {
-            // If the relationBuffer is nonEmpty then find the table with the minimum size.
-            val (aggDataMapSchema, _, relation) = relationBuffer.minBy(_._2.sizeInBytes)
-            val newRelation = new FindDataSourceTable(sparkSession).apply(relation)
-            // transform the query plan based on selected child schema
-            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+            agg
           }
         } else {
-          plan
+          agg
         }
+    }
+    updatedPlan
+  }
+
+  /**
+   * Below method will be used to validate query plan and get the proper aggregation data map schema
+   * and child relation plan object if plan is valid for transformation
+   * @param queryColumns list of query columns from projection and filter
+   * @param aggregateExpressions list of aggregate expression (aggregate function)
+   * @param carbonTable parent carbon table
+   * @param parentLogicalPlan parent logical relation
+   * @return if plan is valid then aggregation data map schema and its relation plan
+   */
+  def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn],
+      aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression],
+      carbonTable: CarbonTable,
+      parentLogicalPlan: LogicalPlan): (AggregationDataMapSchema, LogicalPlan) = {
+    // getting all the projection columns
+    val listProjectionColumn = queryColumns
+      .filter(queryColumn => !queryColumn.isFilterColumn)
+      .toList
+    // getting all the filter columns
+    val listFilterColumn = queryColumns
+      .filter(queryColumn => queryColumn.isFilterColumn)
+      .toList
+    val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
+    // create a query plan object which will be used to select the list of pre aggregate tables
+    // matches with this plan
+    val queryPlan = new AggregateQueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava)
+    // create aggregate table selector object
+    val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
+    // select the list of valid child tables
+    val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
+    // query has only aggregate expression then selected data map will be empty
+    // the validate all the child data map otherwise validate selected data map
+    var selectedAggMaps = if (isProjectionColumnPresent) {
+      selectedDataMapSchemas
+    } else {
+      carbonTable.getTableInfo.getDataMapSchemaList
+    }
+    // if it does not match with any pre aggregate table return the same plan
+    if (!selectedAggMaps.isEmpty) {
+      // filter the selected child schema based on size to select the pre-aggregate tables
+      // that are nonEmpty
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema =>
+        val identifier = TableIdentifier(
+          selectedDataMapSchema.getRelationIdentifier.getTableName,
+          Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+        val carbonRelation =
+          catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+        val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
+        (selectedDataMapSchema, carbonRelation, relation)
+      }.filter(_._2.sizeInBytes != 0L).sortBy(_._2.sizeInBytes)
+      if (relationBuffer.isEmpty) {
+        // If the size of relation Buffer is 0 then it means that none of the pre-aggregate
+        // tables have data yet.
+        // In this case we would return the original plan so that the query hits the parent
+        // table.
+        (null, null)
       } else {
-        plan
+        // if query does not have any aggregate function no need to validate the same
+        val tuple = if (aggregateExpressions.nonEmpty && !selectedAggMaps.isEmpty) {
+          relationBuffer.collectFirst {
+            case a@(datamapSchema, _, _)
+              if validateAggregateExpression(datamapSchema,
+                carbonTable,
+                parentLogicalPlan,
+                aggregateExpressions.toSeq) =>
+              a
+          }
+        } else {
+          Some(relationBuffer.head)
+        }
+        tuple match {
+          case Some((dataMapSchema, _, logicalPlan)) => (dataMapSchema
+            .asInstanceOf[AggregationDataMapSchema], new FindDataSourceTable(sparkSession)
+            .apply(logicalPlan))
+          case None => (null, null)
+        }
+        // If the relationBuffer is nonEmpty then find the table with the minimum size.
+      }
+    } else {
+      (null, null)
+    }
+  }
+
+  /**
+   * Below method will be used to validate aggregate expression with the data map
+   * and will return the selected valid data maps
+   * @param selectedDataMap list of data maps
+   * @param carbonTable parent carbon table
+   * @param parentLogicalPlan parent logical plan
+   * @param queryAggExpLogicalPlans query agg expression logical plan
+   * @return valid data map
+   */
+  def validateAggregateExpression(selectedDataMap: DataMapSchema,
+      carbonTable: CarbonTable,
+      parentLogicalPlan: LogicalPlan,
+      queryAggExpLogicalPlans: Seq[AggregateExpression]): Boolean = {
+    val mappingModel = getExpressionToColumnMapping(selectedDataMap,
+      carbonTable,
+      parentLogicalPlan)
+    queryAggExpLogicalPlans.forall{p =>
+      mappingModel.exists{m =>
+        PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes) == m.expression}
+    }
+  }
+
+  /**
+   * Below method will be used to to get the logical plan for each aggregate expression in
+   * child data map and its column schema mapping if mapping is already present
+   * then it will use the same otherwise it will generate and stored in aggregation data map
+   * @param selectedDataMap child data map
+   * @param carbonTable parent table
+   * @param parentLogicalPlan logical relation of actual plan
+   * @return map of logical plan for each aggregate expression in child query and its column mapping
+   */
+  def getExpressionToColumnMapping(selectedDataMap: DataMapSchema,
+      carbonTable: CarbonTable,
+      parentLogicalPlan: LogicalPlan): mutable.Set[AggExpToColumnMappingModel] = {
+    val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
+    if(null == aggDataMapSchema.getAggExpToColumnMapping) {
+      // add preAGG UDF to avoid all the PreAggregate rule
+      val childDataMapQueryString = parser.addPreAggFunction(
+        aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
+      // get the logical plan
+      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
+      // getting all aggregate expression from query
+      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
+      // in case of average child table will have two columns which will be stored in sequence
+      // so for average expression we need to get two columns for mapping
+      var counter = 0
+      // sorting the columns based on schema ordinal so search will give proper result
+      val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
+        .sortBy(_.getSchemaOrdinal)
+      val expressionToColumnMapping = mutable.LinkedHashSet.empty[AggExpToColumnMappingModel]
+      dataMapAggExp.foreach { aggExp =>
+        val updatedExp = PreAggregateUtil.normalizeExprId(aggExp, aggPlan.allAttributes)
+        val model = AggExpToColumnMappingModel(updatedExp, None)
+        if (!expressionToColumnMapping.contains(model)) {
+          // check if aggregate expression is of type avg
+          // get the columns
+          val columnSchema = aggDataMapSchema
+            .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
+          // increment the counter so when for next expression above code will be
+          // executed it will search from that schema ordinal
+          counter = columnSchema.getSchemaOrdinal + 1
+          model.columnSchema = Some(columnSchema)
+          expressionToColumnMapping += model
+        }
       }
+      aggDataMapSchema.setAggExpToColumnMapping(expressionToColumnMapping.asJava)
+      // return the mapping
+      expressionToColumnMapping
+    } else {
+      aggDataMapSchema.getAggExpToColumnMapping
+        .asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala
+        .asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]]
     }
   }
 
   /**
+   * Below method will be used to get aggregate expression
+   * @param logicalPlan logical plan
+   * @return list of aggregate expression
+   */
+  def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = {
+    val list = scala.collection.mutable.ListBuffer.empty[AggregateExpression]
+    logicalPlan match {
+      case _@Aggregate(_, aggExp, _) =>
+        aggExp map {
+          case Alias(attr: AggregateExpression, _) =>
+            list ++= PreAggregateUtil.validateAggregateFunctionAndGetFields(attr)
+          case _ =>
+        }
+    }
+    list
+  }
+
+  /**
    * Below method will be used to check whether specific segment is set for maintable
    * if it is present then no need to transform the plan and query will be executed on
    * maintable
-   * @param carbonTable
-   *                    parent table
+   * @param carbonTable parent table
    * @return is specific segment is present in session params
    */
   def isSpecificSegmentPresent(carbonTable: CarbonTable) : Boolean = {
@@ -353,26 +554,21 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   /**
    * Below method will be used to extract the query columns from
    * filter expression
-   * @param filterExp
-   *                  filter expression
-   * @param set
-   *             query column list
-   * @param carbonTable
-   *                    parent table
-   * @param tableName
-   *                  table name
+   * @param expression filter expression
+   * @param queryColumns query column set
+   * @param carbonTable parent table
    * @return isvalid filter expression for aggregate
    */
-  def extractQueryColumnFromFilterExp(filterExp: Expression,
-      set: scala.collection.mutable.HashSet[QueryColumn],
-      carbonTable: CarbonTable, tableName: String): Boolean = {
+  def extractColumnFromExpression(expression: Expression,
+      queryColumns: scala.collection.mutable.HashSet[QueryColumn],
+      carbonTable: CarbonTable,
+      isFilterColumn: Boolean = false) {
     // map to maintain attribute reference present in the filter to timeseries function
     // if applied this is added to avoid duplicate column
     val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String]
-    val isValidPlan = true
-    filterExp.transform {
+    expression.transform {
       case attr: AttributeReference =>
-        if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+        if (mapOfColumnSeriesFun.get(attr).isEmpty) {
           mapOfColumnSeriesFun.put(attr, null)
         }
         attr
@@ -381,13 +577,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
           "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
             CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
-          mapOfColumnSeriesFun.put(udf.children(0).asInstanceOf[AttributeReference],
-            udf.children(1).asInstanceOf[Literal].value.toString)
+          mapOfColumnSeriesFun.put(udf.children.head.asInstanceOf[AttributeReference],
+            udf.children.last.asInstanceOf[Literal].value.toString)
         } else {
           // for any other scala udf
           udf.transform {
             case attr: AttributeReference =>
-              if (!mapOfColumnSeriesFun.get(attr).isDefined) {
+              if (mapOfColumnSeriesFun.get(attr).isEmpty) {
                 mapOfColumnSeriesFun.put(attr, null)
               }
               attr
@@ -396,90 +592,37 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         udf
     }
     mapOfColumnSeriesFun.foreach { f =>
-        if (f._2 == null) {
-          set +=
-          getQueryColumn(f._1.name, carbonTable, tableName, isFilterColumn = true)
-        } else {
-          set += getQueryColumn(f._1.name,
-            carbonTable,
-            carbonTable.getTableName,
-            isFilterColumn = true,
-            timeseriesFunction = f._2)
-        }
-    }
-    isValidPlan
-  }
-  /**
-   * Below method will be used to extract columns from order by expression
-   * @param projectList
-   *                    project list from plan
-   * @param sortOrders
-   *                   sort order in plan
-   * @param carbonTable
-   *                    carbon table
-   * @param tableName
-   *                  table name
-   * @return query columns from expression
-   */
-  def extractQueryColumnForOrderBy(projectList: Option[Seq[NamedExpression]] = None,
-      sortOrders: Seq[SortOrder],
-      carbonTable: CarbonTable,
-      tableName: String): Seq[QueryColumn] = {
-    val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
-    if(projectList.isDefined) {
-      projectList.get.map {
-        proList =>
-          proList.transform {
-            case attr: AttributeReference =>
-              val queryColumn = getQueryColumn(attr.name, carbonTable, tableName)
-              if (null != queryColumn) {
-                list += queryColumn
-              }
-              attr
-          }
+      if (f._2 == null) {
+        queryColumns +=
+        getQueryColumn(f._1.name, carbonTable, isFilterColumn)
+      } else {
+        queryColumns += getQueryColumn(f._1.name,
+          carbonTable,
+          isFilterColumn,
+          timeseriesFunction = f._2)
       }
     }
-    sortOrders.foreach { sortOrder =>
-        sortOrder.child match {
-          case attr: AttributeReference =>
-            val queryColumn = getQueryColumn(attr.name, carbonTable, tableName)
-            if (null != queryColumn) {
-              list += queryColumn
-            }
-        }
-    }
-    list
   }
 
   /**
    * Below method will be used to get the child attribute reference
    * based on parent name
    *
-   * @param dataMapSchema
-   * child schema
-   * @param attributeReference
-   * parent attribute reference
-   * @param attributes
-   * child logical relation
-   * @param aggFunction
-   * aggregation function applied on child
-   * @param canBeNull
-   * this is added for strict validation in which case child attribute can be
+   * @param dataMapSchema child schema
+   * @param attributeReference parent attribute reference
+   * @param attributes child logical relation
+   * @param canBeNull this is added for strict validation in which case child attribute can be
    * null and when it cannot be null
    * @return child attribute reference
    */
   def getChildAttributeReference(dataMapSchema: DataMapSchema,
       attributeReference: AttributeReference,
       attributes: Seq[AttributeReference],
-      aggFunction: String = "",
       canBeNull: Boolean = false,
       timeseriesFunction: String = ""): AttributeReference = {
-    val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
-    val columnSchema = if (aggFunction.isEmpty && timeseriesFunction.isEmpty) {
+    val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema]
+    val columnSchema = if (timeseriesFunction.isEmpty) {
       aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
-    } else if (!aggFunction.isEmpty) {
-      aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase,
-        aggFunction)
     } else {
       aggregationDataMapSchema
         .getTimeseriesChildColByParent(attributeReference.name.toLowerCase,
@@ -499,254 +642,6 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
-   * Below method will be used to transform the main table plan to child table plan
-   * rules for transformming is as below.
-   * 1. Grouping expression rules
-   *    1.1 Change the parent attribute reference for of group expression
-   * to child attribute reference
-   *
-   * 2. Aggregate expression rules
-   *    2.1 Change the parent attribute reference for of group expression to
-   * child attribute reference
-   *    2.2 Change the count AggregateExpression to Sum as count
-   * is already calculated so in case of aggregate table
-   * we need to apply sum to get the count
-   *    2.2 In case of average aggregate function select 2 columns from aggregate table with
-   * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
-   * Note: During aggregate table creation for average table will be created with two columns
-   * one for sum(column) and count(column) to support rollup
-   * 3. Filter Expression rules.
-   *    3.1 Updated filter expression attributes with child table attributes
-   * 4. Update the Parent Logical relation with child Logical relation
-   * 5. Order by plan rules.
-   *    5.1 Update project list based on updated aggregate expression
-   *    5.2 Update sort order attributes based on pre aggregate table
-   * 6. timeseries function
-   *    6.1 validate parent table has timeseries datamap
-   *    6.2 timeseries function is valid function or not
-   *
-   * @param logicalPlan
-   * parent logical plan
-   * @param aggDataMapSchema
-   * select data map schema
-   * @param childPlan
-   * child carbon table relation
-   * @return transformed plan
-   */
-  def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
-      aggDataMapSchema: DataMapSchema,
-      childPlan: LogicalPlan): LogicalPlan = {
-    val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
-    logicalPlan.transform {
-      // case for aggregation query
-      case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, None) =
-          getUpdatedExpressions(grExp,
-            aggExp,
-            child,
-            None,
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        Aggregate(updatedGroupExp,
-          updatedAggExp,
-          newChild)
-        // case of handling aggregation query with filter
-      case Aggregate(grExp,
-        aggExp,
-        Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
-          getUpdatedExpressions(grExp,
-            aggExp,
-            child,
-            Some(expression),
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        Aggregate(updatedGroupExp,
-          updatedAggExp,
-          Filter(updatedFilterExpression.get,
-            newChild))
-        // case for aggregation query
-      case Aggregate(grExp, aggExp, l: LogicalRelation)
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, None) =
-          getUpdatedExpressions(grExp,
-            aggExp,
-            l,
-            None,
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        Aggregate(updatedGroupExp,
-          updatedAggExp,
-          newChild)
-        // case for aggregation query with order by
-      case Project(_,
-        Sort(sortOrders,
-          global,
-          Aggregate(groupingExp,
-            aggregateExp,
-            subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, None) =
-          getUpdatedExpressions(groupingExp,
-            aggregateExp,
-            subQuery,
-            None,
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
-          sortOrders,
-          aggDataMapSchema,
-          attributes)
-        Project(updatedProjectList,
-          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild)))
-       // case for handling aggregation query with filter and order by
-      case Project(_,
-        Sort(sortOrders,
-          global,
-          Aggregate(groupingExp,
-            aggregateExp,
-            Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
-          getUpdatedExpressions(groupingExp,
-            aggregateExp,
-            subQuery,
-            Some(expression),
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        val (updatedProjectList, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
-          sortOrders,
-          aggDataMapSchema,
-          attributes)
-        Project(updatedProjectList,
-          Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp,
-            Filter(updatedFilterExpression.get, newChild))))
-      // case for handling aggregation with order by when only projection column exits
-      case Sort(sortOrders,
-        global,
-        Aggregate(
-          groupingExp,
-          aggregateExp,
-          subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
-        if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, None) =
-          getUpdatedExpressions(groupingExp,
-            aggregateExp,
-            subQuery,
-            None,
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
-          sortOrders,
-          aggDataMapSchema,
-          attributes)
-        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
-      // case for handling aggregation with order by and filter when only projection column exits
-      case Sort(sortOrders,
-        global,
-        Aggregate(groupingExp,
-          aggregateExp,
-          Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))
-        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
-           l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
-             metaData.hasAggregateDataMapSchema =>
-        val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
-          getUpdatedExpressions(groupingExp,
-            aggregateExp,
-            subQuery,
-            Some(expression),
-            aggDataMapSchema,
-            attributes,
-            childPlan)
-        val (_, updatedSortOrder) = transformPlanForOrderBy(updatedAggExp,
-          sortOrders,
-          aggDataMapSchema,
-          attributes)
-        Sort(updatedSortOrder, global, Aggregate(updatedGroupExp, updatedAggExp, newChild))
-    }
-  }
-
-  /**
-   * Below method will be used to updated the maintable plan for order by query
-   * In case of order by we need to update project list and sort order attributes.
-   *
-   * @param aggregateExp
-   *                     child table aggregate expression
-   * @param sortOrders
-   *                   sort order expression in maintable plan
-   * @param aggDataMapSchema
-   *                         child data map schema
-   * @param attributes
-   *                   child attributes
-   * @return updated project list and updated sort order
-   */
-  def transformPlanForOrderBy(aggregateExp: Seq[NamedExpression],
-      sortOrders: Seq[SortOrder], aggDataMapSchema: DataMapSchema,
-      attributes: Seq[AttributeReference]): (Seq[NamedExpression], Seq[SortOrder]) = {
-    val updatedProjectList = new ArrayBuffer[NamedExpression]()
-    // getting the updated project list from aggregate expression
-    aggregateExp.foreach{f => f.transform {
-      // for projection column
-      case alias@Alias(attr: AttributeReference, name) =>
-        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
-          alias.qualifier,
-          alias.isGenerated)
-        alias
-        // for aggregaton column
-      case alias@Alias(attr: AggregateExpression, name) =>
-        updatedProjectList += AttributeReference(name, attr.dataType, attr.nullable)(alias.exprId,
-          alias.qualifier,
-          alias.isGenerated)
-        alias
-      case alias@Alias(exp: Expression, name) =>
-        updatedProjectList += AttributeReference(name, exp.dataType, exp.nullable)(alias.exprId,
-          alias.qualifier,
-          alias.isGenerated)
-        alias
-    }
-    }
-    // getting the updated sort order
-    val updatedSortOrders = sortOrders.map { order =>
-      order.child match {
-        case attr: AttributeReference =>
-          val childAttribute = getChildAttributeReference(aggDataMapSchema,
-            attr,
-            attributes,
-            canBeNull = true)
-          // child attribute can be null only in case of alias in query
-          // so in that case we need to update the sortorder based on new alias
-          if (null != childAttribute) {
-            val childExpression = getUpdatedSortOrderExpression(childAttribute, aggregateExp)
-            SortOrder(childExpression, order.direction)
-          } else {
-            val childExpression = getUpdatedSortOrderExpression(attr, aggregateExp)
-            SortOrder(childExpression, order.direction)
-          }
-      }
-    }
-    (updatedProjectList, updatedSortOrders)
-  }
-  /**
    * Below method will be used to get the updated expression for pre aggregated table.
    * It will replace the attribute of actual plan with child table attributes.
    * Updation will be done for below expression.
@@ -755,37 +650,41 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * 3. child logical plan
    * 4. filter expression if present
    *
-   * @param groupingExpressions
-   * actual plan grouping expression
-   * @param aggregateExpressions
-   * actual plan aggregate expression
-   * @param child
-   * child logical plan
-   * @param filterExpression
-   * filter expression
-   * @param aggDataMapSchema
-   * pre aggregate table schema
-   * @param attributes
-   * pre aggregate table logical relation
-   * @param aggPlan
-   *                aggregate logical plan
+   * @param groupingExpressions actual plan grouping expression
+   * @param aggregateExpressions actual plan aggregate expression
+   * @param child child logical plan
+   * @param filterExpression filter expression
+   * @param aggDataMapSchema pre aggregate table schema
+   * @param attributes pre aggregate table logical relation
+   * @param aggPlan aggregate logical plan
    * @return tuple of(updated grouping expression,
-   *         updated aggregate expression,
-   *         updated child logical plan,
-   *         updated filter expression if present in actual plan)
+   * updated aggregate expression,
+   * updated child logical plan,
+   * updated filter expression if present in actual plan)
    */
   def getUpdatedExpressions(groupingExpressions: Seq[Expression],
       aggregateExpressions: Seq[NamedExpression],
       child: LogicalPlan, filterExpression: Option[Expression] = None,
-      aggDataMapSchema: DataMapSchema,
+      aggDataMapSchema: AggregationDataMapSchema,
       attributes: Seq[AttributeReference],
-      aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
-      Option[Expression]) = {
+      aggPlan: LogicalPlan,
+      parentTable: CarbonTable,
+      parentLogicalPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
+    Option[Expression]) = {
+    val aggExpColumnMapping = if (null != aggDataMapSchema.getAggExpToColumnMapping) {
+      Some(aggDataMapSchema.getAggExpToColumnMapping
+        .asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala
+        .asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]])
+    } else {
+      None
+    }
+
     // transforming the group by expression attributes with child attributes
     val updatedGroupExp = groupingExpressions.map { exp =>
       exp.transform {
         case attr: AttributeReference =>
-          getChildAttributeReference(aggDataMapSchema, attr, attributes)
+          val childAttr = getChildAttributeReference(aggDataMapSchema, attr, attributes)
+          childAttr
       }
     }
     // below code is for updating the aggregate expression.
@@ -806,59 +705,97 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
     val updatedAggExp = aggregateExpressions.map {
       // case for attribute reference
       case attr: AttributeReference =>
-        val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+        val childAttr = getChildAttributeReference(aggDataMapSchema,
           attr,
           attributes)
+        val newExpressionId = NamedExpression.newExprId
+        val childTableAttr = AttributeReference(attr.name,
+          childAttr.dataType,
+          childAttr.nullable,
+          childAttr.metadata)(newExpressionId, childAttr.qualifier, attr.isGenerated)
+        updatedExpression.put(attr, childTableAttr)
         // returning the alias to show proper column name in output
-        Alias(childAttributeReference,
-          attr.name)(NamedExpression.newExprId,
-          childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+        Alias(childAttr,
+          attr.name)(newExpressionId,
+          childAttr.qualifier).asInstanceOf[NamedExpression]
       // case for alias
-      case Alias(attr: AttributeReference, name) =>
-        val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+      case alias@Alias(attr: AttributeReference, name) =>
+        val childAttr = getChildAttributeReference(aggDataMapSchema,
           attr,
           attributes)
+        val newExpressionId = NamedExpression.newExprId
+        val parentTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val childTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+        updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
-        Alias(childAttributeReference,
-          name)(NamedExpression.newExprId,
-          childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+        Alias(childAttr,
+          name)(newExpressionId,
+          childAttr.qualifier).asInstanceOf[NamedExpression]
       // for aggregate function case
       case alias@Alias(attr: AggregateExpression, name) =>
         // get the updated aggregate aggregate function
-        val aggExp = getUpdatedAggregateExpressionForChild(attr,
-          aggDataMapSchema,
-          attributes)
+        val aggExp = if (aggExpColumnMapping.isDefined) {
+          getUpdatedAggregateExpressionForChild(attr,
+            aggDataMapSchema,
+            attributes,
+            parentTable,
+            parentLogicalPlan,
+            aggExpColumnMapping.get)
+        } else {
+          attr
+        }
+        val newExpressionId = NamedExpression.newExprId
+        val parentTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val childTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+        updatedExpression.put(parentTableAttr, childTableAttr)
         // returning alias with child attribute reference
         Alias(aggExp,
-          name)(NamedExpression.newExprId,
+          name)(newExpressionId,
           alias.qualifier).asInstanceOf[NamedExpression]
       case alias@Alias(expression: Expression, name) =>
         val updatedExp =
           if (expression.isInstanceOf[ScalaUDF] &&
               expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
                 "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) {
-          expression.asInstanceOf[ScalaUDF].transform {
-            case attr: AttributeReference =>
-            val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
-              attr,
-              attributes,
-              timeseriesFunction =
-                expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value.toString)
-            childAttributeReference
-          }
-        } else {
-          expression.transform{
-            case attr: AttributeReference =>
-              val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
-                attr,
-                attributes)
-              childAttributeReference
+            expression.asInstanceOf[ScalaUDF].transform {
+              case attr: AttributeReference =>
+                val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+                  attr,
+                  attributes,
+                  timeseriesFunction =
+                    expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value
+                      .toString)
+                childAttributeReference
+            }
+          } else {
+            expression.transform{
+              case attr: AttributeReference =>
+                val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+                  attr,
+                  attributes)
+                childAttributeReference
+            }
           }
-        }
-        Alias(updatedExp, name)(NamedExpression.newExprId,
+        val newExpressionId = NamedExpression.newExprId
+        val parentTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (alias.exprId, alias.qualifier, alias.isGenerated)
+        val childTableAttr = AttributeReference(name,
+          alias.dataType,
+          alias.nullable) (newExpressionId, alias.qualifier, alias.isGenerated)
+        updatedExpression.put(parentTableAttr, childTableAttr)
+        Alias(updatedExp, name)(newExpressionId,
           alias.qualifier).asInstanceOf[NamedExpression]
     }
-    // transformaing the logical relation
+    // transforming the logical relation
     val newChild = child.transform {
       case _: LogicalRelation =>
         aggPlan
@@ -866,7 +803,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
         aggPlan match {
           case s: SubqueryAlias => s.child
           case others => others
-    }
+        }
     }
     // updating the filter expression if present
     val updatedFilterExpression = if (filterExpression.isDefined) {
@@ -882,39 +819,6 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
-   * Below method will be used to get the updated sort order attribute
-   * based on pre aggregate table
-   * @param sortOrderAttr
-   *                      sort order attributes reference
-   * @param aggregateExpressions
-   *                             aggregate expression
-   * @return updated sortorder attribute
-   */
-  def getUpdatedSortOrderExpression(sortOrderAttr: AttributeReference,
-      aggregateExpressions: Seq[NamedExpression]): Expression = {
-    val updatedExpression = aggregateExpressions collectFirst {
-      // in case of alias we need to match with alias name and when alias is not present
-      // we need to compare with attribute reference name
-      case alias@Alias(attr: AttributeReference, name)
-        if attr.name.equalsIgnoreCase(sortOrderAttr.name) ||
-           name.equalsIgnoreCase(sortOrderAttr.name) =>
-          AttributeReference(name,
-            sortOrderAttr.dataType,
-            sortOrderAttr.nullable,
-            sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
-      case alias@Alias(_: Expression, name)
-        if name.equalsIgnoreCase(sortOrderAttr.name) =>
-          AttributeReference(name,
-          sortOrderAttr.dataType,
-          sortOrderAttr.nullable,
-          sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated)
-    }
-    // any case it will match the condition, so no need to check whether updated expression is empty
-    // or not
-    updatedExpression.get
-  }
-
-  /**
    * Below method will be used to get the aggregate expression based on match
    * Aggregate expression updation rules
    * 1 Change the count AggregateExpression to Sum as count
@@ -927,166 +831,120 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
    * table will be created with two columns one for sum(column) and count(column)
    * to support rollup
    *
-   * @param aggExp
-   * aggregate expression
-   * @param dataMapSchema
-   * child data map schema
-   * @param attributes
-   * child logical relation
+   * @param aggExp aggregate expression
+   * @param dataMapSchema child data map schema
+   * @param attributes child logical relation
+   * @param parentTable parent carbon table
+   * @param parentLogicalPlan logical relation
    * @return updated expression
    */
   def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
-      dataMapSchema: DataMapSchema,
-      attributes: Seq[AttributeReference]):
+      dataMapSchema: AggregationDataMapSchema,
+      attributes: Seq[AttributeReference],
+      parentTable: CarbonTable,
+      parentLogicalPlan: LogicalPlan,
+      aggExpColumnMapping: mutable.LinkedHashSet[AggExpToColumnMappingModel]):
   Expression = {
+    // get the updated aggregate expression, in case of average column
+    // it will be divided in two aggergate expression
+    val updatedAggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
+    // get the attributes to be updated for child table
+    val attrs = aggExpColumnMapping.collect {
+      case (schemaAggExpModel)
+        if updatedAggExp
+          .exists(p =>
+            schemaAggExpModel.expression ==
+            PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes)) =>
+        attributes filter (_.name.equalsIgnoreCase(
+          schemaAggExpModel.columnSchema.get.asInstanceOf[ColumnSchema].getColumnName))
+    }.flatten
+
     aggExp.aggregateFunction match {
+      case Sum(MatchCastExpression(_, changeDataType: DataType)) =>
+        AggregateExpression(Sum(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+      case Sum(_) =>
+        AggregateExpression(Sum(attrs.head), aggExp.mode, isDistinct = false)
+      case Max(MatchCastExpression(_, changeDataType: DataType)) =>
+        AggregateExpression(Max(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+      case Max(_) =>
+        AggregateExpression(Max(attrs.head), aggExp.mode, isDistinct = false)
+      case Min(MatchCastExpression(_, changeDataType: DataType)) =>
+        AggregateExpression(Min(Cast(attrs.head, changeDataType)), aggExp.mode, isDistinct = false)
+      case Min(_) =>
+        AggregateExpression(Min(attrs.head), aggExp.mode, isDistinct = false)
       // Change the count AggregateExpression to Sum as count
       // is already calculated so in case of aggregate table
       // we need to apply sum to get the count
-      case count@Count(Seq(attr: AttributeReference)) =>
-        AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          count.prettyName),
-          LongType)),
-          aggExp.mode,
-          isDistinct = false)
-      case sum@Sum(attr: AttributeReference) =>
-        AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          sum.prettyName)),
-          aggExp.mode,
-          isDistinct = false)
-      case max@Max(attr: AttributeReference) =>
-        AggregateExpression(Max(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          max.prettyName)),
-          aggExp.mode,
-          isDistinct = false)
-      case min@Min(attr: AttributeReference) =>
-        AggregateExpression(Min(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          min.prettyName)),
-          aggExp.mode,
-          isDistinct = false)
-      case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          sum.prettyName),
-          changeDataType)),
-          aggExp.mode,
-          isDistinct = false)
-      case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          min.prettyName),
-          changeDataType)),
-          aggExp.mode,
-          isDistinct = false)
-      case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          max.prettyName),
-          changeDataType)),
-          aggExp.mode,
-          isDistinct = false)
-
+      case Count(Seq(expression: Expression)) =>
+        AggregateExpression(Sum(Cast(attrs.head, LongType)), aggExp.mode, isDistinct = false)
       // In case of average aggregate function select 2 columns from aggregate table
       // with aggregation sum and count.
       // Then add divide(sum(column with sum), sum(column with count)).
-      case Average(attr: AttributeReference) =>
-        Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          "sum")),
+      case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+        Divide(AggregateExpression(Sum(Cast(
+          attrs.head,
+          DoubleType)),
           aggExp.mode,
           isDistinct = false),
-          AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
-            attr,
-            attributes,
-            "count")),
+          AggregateExpression(Sum(Cast(
+            attrs.last,
+            DoubleType)),
             aggExp.mode,
             isDistinct = false))
       // In case of average aggregate function select 2 columns from aggregate table
       // with aggregation sum and count.
       // Then add divide(sum(column with sum), sum(column with count)).
-      case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
-          attr,
-          attributes,
-          "sum"),
-          DoubleType)),
+      case Average(exp: Expression) =>
+        Divide(AggregateExpression(Sum(attrs.head),
           aggExp.mode,
           isDistinct = false),
-          AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
-            attr,
-            attributes,
-            "count"),
-            DoubleType)),
+          AggregateExpression(Sum(attrs.last),
             aggExp.mode,
             isDistinct = false))
     }
   }
-
   /**
    * Method to get the carbon table and table name
-   *
-   * @param parentLogicalRelation
-   * parent table relation
-   * @return tuple of carbon table and table name
+   * @param parentLogicalRelation parent table relation
+   * @return tuple of carbon table
    */
-  def getCarbonTableAndTableName(parentLogicalRelation: LogicalRelation): (CarbonTable, String) = {
+  def getCarbonTable(parentLogicalRelation: LogicalRelation): CarbonTable = {
     val carbonTable = parentLogicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
       .carbonRelation
       .metaData.carbonTable
-    val tableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-      .getTableName
-    (carbonTable, tableName)
+    carbonTable
   }
 
   /**
    * Below method will be used to get the query columns from plan
-   *
-   * @param groupByExpression
-   * group by expression
-   * @param aggregateExpressions
-   * aggregate expression
-   * @param carbonTable
-   * parent carbon table
-   * @param tableName
-   * parent table name
-   * @param set
-   * list of attributes
+   * @param groupByExpression group by expression
+   * @param aggregateExpressions aggregate expression
+   * @param carbonTable parent carbon table
+   * @param queryColumns list of attributes
    * @return plan is valid
    */
   def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression],
       aggregateExpressions: Seq[NamedExpression],
-      carbonTable: CarbonTable, tableName: String,
-      set: scala.collection.mutable.HashSet[QueryColumn]): Boolean = {
+      carbonTable: CarbonTable,
+      queryColumns: scala.collection.mutable.HashSet[QueryColumn],
+      aggreagteExps: scala.collection.mutable.HashSet[AggregateExpression]): Boolean = {
+    groupByExpression foreach  { expression =>
+      extractColumnFromExpression(expression, queryColumns, carbonTable)
+    }
     aggregateExpressions.map {
       case attr: AttributeReference =>
-        set += getQueryColumn(attr.name,
-          carbonTable,
-          tableName)
+        queryColumns += getQueryColumn(attr.name,
+          carbonTable)
       case Alias(attr: AttributeReference, _) =>
-        set += getQueryColumn(attr.name,
-          carbonTable,
-          tableName);
+        queryColumns += getQueryColumn(attr.name,
+          carbonTable);
       case Alias(attr: AggregateExpression, _) =>
         if (attr.isDistinct) {
           return false
         }
-        val queryColumn = validateAggregateFunctionAndGetFields(carbonTable,
-          attr.aggregateFunction,
-          tableName)
-        if (queryColumn.nonEmpty) {
-          set ++= queryColumn
+        val aggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(attr)
+        if (aggExp.nonEmpty) {
+          aggreagteExps ++= aggExp
         } else {
           return false
         }
@@ -1095,18 +953,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
               "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
             CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
-          set += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0)
+          queryColumns += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0)
             .asInstanceOf[AttributeReference].name,
             carbonTable,
-            tableName,
             timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal]
               .value.toString)
         } else {
           expression.transform {
             case attr: AttributeReference =>
-              set += getQueryColumn(attr.name,
-                carbonTable,
-                tableName)
+              queryColumns += getQueryColumn(attr.name,
+                carbonTable)
               attr
           }
         }
@@ -1115,215 +971,112 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
   }
 
   /**
-   * Below method will be used to validate aggregate function and get the attribute information
-   * which is applied on select query.
-   * Currently sum, max, min, count, avg is supported
-   * in case of any other aggregate function it will return empty sequence
-   * In case of avg it will return two fields one for count
-   * and other of sum of that column to support rollup
-   *
-   * @param carbonTable
-   * parent table
-   * @param aggFunctions
-   * aggregation function
-   * @param tableName
-   * parent table name
-   * @return list of fields
-   */
-  def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
-      aggFunctions: AggregateFunction,
-      tableName: String
-  ): Seq[QueryColumn] = {
-    val changedDataType = true
-    aggFunctions match {
-      case sum@Sum(attr: AttributeReference) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          sum.prettyName))
-      case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          sum.prettyName,
-          changeDataType.typeName,
-          changedDataType))
-      case count@Count(Seq(attr: AttributeReference)) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          count.prettyName))
-      case min@Min(attr: AttributeReference) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          min.prettyName))
-      case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          min.prettyName,
-          changeDataType.typeName,
-          changedDataType))
-      case max@Max(attr: AttributeReference) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          max.prettyName))
-      case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          max.prettyName,
-          changeDataType.typeName,
-          changedDataType))
-      // in case of average need to return two columns
-      // sum and count of the column to added during table creation to support rollup
-      case Average(attr: AttributeReference) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          "sum"
-        ), getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          "count"
-        ))
-      // in case of average need to return two columns
-      // sum and count of the column to added during table creation to support rollup
-      case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
-        Seq(getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          "sum",
-          changeDataType.typeName,
-          changedDataType), getQueryColumn(attr.name,
-          carbonTable,
-          tableName,
-          "count",
-          changeDataType.typeName,
-          changedDataType))
-      case _ =>
-        Seq.empty
-    }
-  }
-
-
-
-  /**
    * Below method will be used to get the query column object which
    * will have details of the column and its property
    *
-   * @param columnName
-   * parent column name
-   * @param carbonTable
-   * parent carbon table
-   * @param tableName
-   * parent table name
-   * @param aggFunction
-   * aggregate function applied
-   * @param dataType
-   * data type of the column
-   * @param isChangedDataType
-   * is cast is applied on column
-   * @param isFilterColumn
-   * is filter is applied on column
+   * @param columnName parent column name
+   * @param carbonTable parent carbon table
+   * @param isFilterColumn is filter is applied on column
    * @return query column
    */
   def getQueryColumn(columnName: String,
       carbonTable: CarbonTable,
-      tableName: String,
-      aggFunction: String = "",
-      dataType: String = "",
-      isChangedDataType: Boolean = false,
       isFilterColumn: Boolean = false,
       timeseriesFunction: String = ""): QueryColumn = {
-    val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase)
+    val columnSchema = carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase)
     if(null == columnSchema) {
       null
     } else {
-      if (isChangedDataType) {
-        new QueryColumn(columnSchema.getColumnSchema,
-          columnSchema.getDataType.getName,
-          aggFunction.toLowerCase,
-          isFilterColumn,
-          timeseriesFunction.toLowerCase)
-      } else {
         new QueryColumn(columnSchema.getColumnSchema,
-          CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
-          aggFunction.toLowerCase,
           isFilterColumn,
           timeseriesFunction.toLowerCase)
-      }
     }
   }
 }
 
-object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
-
+/**
+ * Data loading rule class to validate and update the data loading query plan
+ * Validation rule:
+ * 1. update the avg aggregate expression with two columns sum and count
+ * 2. Remove duplicate sum and count expression if already there in plan
+ * @param sparkSession spark session
+ */
+case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
+  extends Rule[LogicalPlan] {
+  lazy val parser = new CarbonSpark2SqlParser
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
+    val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
+    val namedExpressionList = scala.collection.mutable.LinkedHashSet.empty[NamedExpression]
     plan transform {
-      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
+      case aggregate@Aggregate(_,
+      aExp,
+      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
+        if validateAggregateExpressions(aExp) &&
+        logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
         aExp.foreach {
-          case alias: Alias =>
-            validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
-          case _: UnresolvedAlias =>
-          case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
+          case attr: AttributeReference =>
+              namedExpressionList += attr
+            case alias@Alias(_: AttributeReference, _) =>
+              namedExpressionList += alias
+            case alias@Alias(aggExp: AggregateExpression, name) =>
+              // get the updated expression for avg convert it to two expression
+              // sum and count
+              val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
+              // if size is more than one then it was for average
+              if(expressions.size > 1) {
+                val sumExp = PreAggregateUtil.normalizeExprId(
+                  expressions.head,
+                  aggregate.allAttributes)
+                // get the logical plan fro count expression
+                val countExp = PreAggregateUtil.normalizeExprId(
+                  expressions.last,
+                  aggregate.allAttributes)
+                // check with same expression already sum is present then do not add to
+                // named expression list otherwise update the list and add it to set
+                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
+                  namedExpressionList +=
+                  Alias(expressions.head, name + "_ sum")(NamedExpression.newExprId,
+                    alias.qualifier,
+                    Some(alias.metadata),
+                    alias.isGenerated)
+                  validExpressionsMap += AggExpToColumnMappingModel(sumExp)
+                }
+                // check with same expression already count is present then do not add to
+                // named expression list otherwise update the list and add it to set
+                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
+                  namedExpressionList +=
+                  Alias(expressions.last, name + "_ count")(NamedExpression.newExprId,
+                    alias.qualifier,
+                    Some(alias.metadata),
+                    alias.isGenerated)
+                  validExpressionsMap += AggExpToColumnMappingModel(countExp)
+                }
+              } else {
+                // get the logical plan for expression
+                val exp = PreAggregateUtil.normalizeExprId(
+                  expressions.head,
+                  aggregate.allAttributes)
+                // check with same expression already  present then do not add to
+                // named expression list otherwise update the list and add it to set
+                if (!validExpressionsMap.contains(AggExpToColumnMappingModel(exp))) {
+                  namedExpressionList+=alias
+                  validExpressionsMap += AggExpToColumnMappingModel(exp)
+                }
+              }
+            case alias@Alias(_: Expression, _) =>
+              namedExpressionList += alias
         }
-        aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
+        aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
       case plan: LogicalPlan => plan
     }
   }
 
-    /**
-     * This method will split the avg column into sum and count and will return a sequence of tuple
-     * of unique name, alias
-     *
-     */
-    private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String,
-      NamedExpression)] = {
-      alias match {
-        case udf@Alias(_: ScalaUDF, name) =>
-          Seq((name, udf))
-        case alias@Alias(attrExpression: AggregateExpression, _) =>
-          attrExpression.aggregateFunction match {
-            case Sum(attr: AttributeReference) =>
-              (attr.name + "_sum", alias) :: Nil
-            case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
-              (attr.name + "_sum", alias) :: Nil
-            case Count(Seq(attr: AttributeReference)) =>
-              (attr.name + "_count", alias) :: Nil
-            case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
-              (attr.name + "_count", alias) :: Nil
-            case Average(attr: AttributeReference) =>
-              Seq((attr.name + "_sum", Alias(attrExpression.
-                copy(aggregateFunction = Sum(attr),
-                  resultId = NamedExpression.newExprId), attr.name + "_sum")()),
-                (attr.name, Alias(attrExpression.
-                  copy(aggregateFunction = Count(attr),
-                    resultId = NamedExpression.newExprId), attr.name + "_count")()))
-            case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
-              Seq((attr.name + "_sum", Alias(attrExpression.
-                copy(aggregateFunction = Sum(cast),
-                  resultId = NamedExpression.newExprId),
-                attr.name + "_sum")()),
-                (attr.name, Alias(attrExpression.
-                  copy(aggregateFunction = Count(cast), resultId =
-                    NamedExpression.newExprId), attr.name + "_count")()))
-            case _ => Seq(("", alias))
-          }
-
-      }
-    }
-
   /**
    * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
    * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
    * valid.
-   *
-   * @param namedExpression
-   * @return
+   * @param namedExpression named expressions
+   * @return valid or not
    */
   private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = {
     val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 3d84dd3..a1fa382 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -262,7 +262,7 @@ class CarbonAnalyzer(catalog: SessionCatalog,
     analyzer: Analyzer) extends Analyzer(catalog, conf) {
   override def execute(plan: LogicalPlan): LogicalPlan = {
     var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
     CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c70e73f1/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index f563007..5046541 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -190,7 +190,7 @@ class CarbonAnalyzer(catalog: SessionCatalog,
     analyzer: Analyzer) extends Analyzer(catalog, conf) {
   override def execute(plan: LogicalPlan): LogicalPlan = {
     var logicalPlan = analyzer.execute(plan)
-    logicalPlan = CarbonPreAggregateDataLoadingRules(logicalPlan)
+    logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
     CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
   }
 }


Mime
View raw message