carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4076] Fix MV having Subquery alias used in query projection #4038
Date Fri, 18 Dec 2020 12:12:51 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 88566e0  [CARBONDATA-4076] Fix MV having Subquery alias used in query projection
#4038
88566e0 is described below

commit 88566e076ad98c524009a58e03ecc0e134e27111
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Thu Dec 3 17:44:50 2020 +0530

    [CARBONDATA-4076] Fix MV having Subquery alias used in query projection #4038
    
    Why is this PR needed?
    If alias is used in subquery and that alias is used in main query projection,
    then the query is not hitting MV after creation.
    
    For example:
    
    Select a, sum(b) from (select c, d b from t1)
    
    This is because, currently, we are not checking for subquery alias child node
    for matching the query.
    
    What changes were proposed in this PR?
    1. If the outputList contains Alias, then compare the alias child of subsume/subsumer
    2. In GroupbyNoselect, compare the alias.sql subsume/subsumer
    3. Added logs for MV Rewrite and matching
    
    This closes #4038
---
 .../org/apache/carbondata/core/view/MVManager.java |   6 +-
 .../org/apache/spark/sql/optimizer/MVMatcher.scala | 103 +++++++++----
 .../org/apache/spark/sql/optimizer/MVRewrite.scala | 167 +++++++++++++--------
 .../apache/spark/sql/optimizer/MVRewriteRule.scala |  19 ++-
 .../carbondata/view/rewrite/MVCreateTestCase.scala |  34 +++++
 5 files changed, 233 insertions(+), 96 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index bc02207..62f0583 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -177,7 +177,11 @@ public abstract class MVManager {
               catalog.registerSchema(schema);
             } catch (Exception e) {
               // Ignore the schema
-              LOGGER.error("Error while registering schema", e);
+              LOGGER.error("Error while registering schema for mv: " + schema.getIdentifier()
+                  .getTableName());
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(e.getMessage());
+              }
             }
           }
           this.catalog = catalog;
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index d562b9d..83fb83e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.optimizer
 
+import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.CarbonToSparkAdapter
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference,
AttributeSet, Expression, PredicateHelper, _}
@@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCo
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
 import org.apache.spark.sql.types.{DataType, Metadata}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
 import org.apache.carbondata.mv.plans.modular
 import org.apache.carbondata.mv.plans.modular.Flags._
@@ -229,6 +231,34 @@ private abstract class MVMatchPattern extends Logging {
   }
 
   /**
+   * Compares the output list of subsumer/subsumee with/without alias. In case if, expression
+   * is instance of Alias, then compare it's child expression.
+   */
+  protected def compareOutputList(subsumerOutputList: Seq[NamedExpression],
+      subsumeeOutputList: Seq[NamedExpression]): Boolean = {
+    subsumerOutputList.forall {
+      case a@Alias(cast: Cast, _) =>
+        subsumeeOutputList.exists {
+          case Alias(castExp: Cast, _) => castExp.child.semanticEquals(cast.child)
+          case alias: Alias => alias.child.semanticEquals(cast.child)
+          case exp => exp.semanticEquals(cast.child)
+        } || isExpressionMatches(a, subsumeeOutputList)
+      case a@Alias(_, _) =>
+        subsumeeOutputList.exists {
+          case Alias(cast: Cast, _) => cast.child.semanticEquals(a.child)
+          case alias: Alias => alias.child.semanticEquals(a.child) ||
+                               alias.sql.equalsIgnoreCase(a.sql)
+          case exp => exp.semanticEquals(a.child)
+        } || isExpressionMatches(a, subsumeeOutputList)
+      case ex@exp =>
+        subsumeeOutputList.exists {
+          case alias: Alias => alias.child.semanticEquals(exp)
+          case expr => expr.semanticEquals(exp)
+        } || isExpressionMatches(ex, subsumeeOutputList)
+    }
+  }
+
+  /**
    * Check if expr1 and expr2 matches TimeSeriesUDF function. If both expressions are
    * timeseries udf functions, then check it's children are same irrespective of case.
    */
@@ -592,6 +622,9 @@ private abstract class MVMatchPattern extends Logging {
  */
 
 private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHelper {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   private def isDerivable(
       exprE: Expression,
       exprListR: Seq[Expression],
@@ -647,6 +680,10 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel
         ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
              sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
 
+        LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }. ")
+
         // assume children (including harmonized relation) of subsumer and subsumee
         // are 1-1 correspondence.
         // Change the following two conditions to more complicated ones if we want to
@@ -723,12 +760,8 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel
           val isPredicateEmR = sel_1q.predicateList.forall(expr =>
             sel_1a.predicateList.exists(_.semanticEquals(expr)) ||
             isExpressionMatches(expr, sel_1a.predicateList))
-          val isOutputEmR = sel_1q.outputList.forall(expr =>
-            sel_1a.outputList.exists(_.semanticEquals(expr)) ||
-            isExpressionMatches(expr, sel_1a.outputList))
-          val isOutputRmE = sel_1a.outputList.forall(expr =>
-            sel_1q.outputList.exists(_.semanticEquals(expr)) ||
-            isExpressionMatches(expr, sel_1q.outputList))
+          val isOutputEmR = compareOutputList(sel_1q.outputList, sel_1a.outputList)
+          val isOutputRmE = compareOutputList(sel_1a.outputList, sel_1q.outputList)
           val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType
== Inner)
 
           if (r2eJoinsMatch) {
@@ -803,6 +836,9 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel
         sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
         if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
            sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+        LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }")
         val isPredicateRmE = sel_3a.predicateList.isEmpty ||
                              sel_3a.predicateList.forall(expr =>
                                sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
@@ -856,6 +892,9 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel
 }
 
 private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -866,31 +905,16 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
         gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
         gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
         None) =>
+        LOGGER.debug(s"Applying pattern: {GroupbyGroupbyNoChildDelta} for the plan: " +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }")
         val isGroupingEmR = gb_2q.predicateList.forall(expr =>
           gb_2a.predicateList.exists(_.semanticEquals(expr)) ||
           isExpressionMatches(expr, gb_2a.predicateList))
         val isGroupingRmE = gb_2a.predicateList.forall(expr =>
           gb_2q.predicateList.exists(_.semanticEquals(expr)) ||
           isExpressionMatches(expr, gb_2q.predicateList))
-        val isOutputEmR = gb_2q.outputList.forall {
-          case Alias(cast: Cast, _) =>
-            gb_2a.outputList.exists {
-              case Alias(castExp: Cast, _) => castExp.child.semanticEquals(cast.child)
-              case alias: Alias => alias.child.semanticEquals(cast.child)
-              case exp => exp.semanticEquals(cast.child)
-            }
-          case a @ Alias(_, _) =>
-            gb_2a.outputList.exists {
-              case Alias(cast: Cast, _) => cast.child.semanticEquals(a.child)
-              case alias: Alias => alias.child.semanticEquals(a.child)
-              case exp => exp.semanticEquals(a.child)
-            }
-          case exp =>
-            gb_2a.outputList.exists {
-              case alias: Alias => alias.child.semanticEquals(exp)
-              case expr => expr.semanticEquals(exp)
-            }
-        }
+        val isOutputEmR = compareOutputList(gb_2q.outputList, gb_2a.outputList)
         if (isGroupingEmR && isGroupingRmE) {
           if (isOutputEmR) {
             // Mappings of output of two plans by checking semantic equals.
@@ -898,7 +922,8 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
               (exp, gb_2q.outputList.find {
                 case a: Alias if exp.isInstanceOf[Alias] =>
                   a.child.semanticEquals(exp.children.head) ||
-                    isExpressionMatches(a.child, exp.children.head)
+                  isExpressionMatches(a.child, exp.children.head) ||
+                  a.sql.equalsIgnoreCase(exp.sql)
                 case a: Alias => a.child.semanticEquals(exp)
                 case other => exp match {
                   case alias: Alias =>
@@ -971,6 +996,9 @@ private object GroupbyGroupbyNoChildDelta extends MVMatchPattern {
 
 private object GroupbyGroupbySelectOnlyChildDelta
   extends MVMatchPattern with PredicateHelper {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   private def isDerivable(
       exprE: Expression,
       exprListR: Seq[Expression],
@@ -1083,6 +1111,11 @@ private object GroupbyGroupbySelectOnlyChildDelta
         true)
         if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
 
+        LOGGER.debug(s"Applying pattern: {GroupbyGroupbySelectOnlyChildDelta} for the plan:
" +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
+                     s"Compensation: { ${ sel_1c1.toString().trim } }")
+
         val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
         val isGroupingEdR = gb_2q.predicateList.forall(expr =>
           isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
@@ -1169,6 +1202,9 @@ private object GroupbyGroupbySelectOnlyChildDelta
 }
 
 private object GroupbyGroupbyGroupbyChildDelta extends MVMatchPattern {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -1181,6 +1217,9 @@ private object GroupbyGroupbyGroupbyChildDelta extends MVMatchPattern
{
         Select(_, _, _, _, _, _, _, _, _, _),
         Select(_, _, _, _, _, _, _, _, _, _),
         true) =>
+        LOGGER.debug(s"Applying pattern: {GroupbyGroupbyGroupbyChildDelta} for the plan:
" +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }")
         // TODO: implement me
         Nil
 
@@ -1191,6 +1230,9 @@ private object GroupbyGroupbyGroupbyChildDelta extends MVMatchPattern
{
 
 
 private object SelectSelectSelectChildDelta extends MVMatchPattern {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -1206,6 +1248,9 @@ private object SelectSelectSelectChildDelta extends MVMatchPattern {
         modular.Select(_, _, _, _, _, _, _, _, _, _),
         modular.Select(_, _, _, _, _, _, _, _, _, _),
         true) =>
+        LOGGER.debug(s"Applying pattern: {SelectSelectSelectChildDelta} for the plan: " +
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }")
         // TODO: implement me
         Nil
       case _ => Nil
@@ -1216,6 +1261,8 @@ private object SelectSelectSelectChildDelta extends MVMatchPattern {
 private object SelectSelectGroupbyChildDelta
   extends MVMatchPattern with PredicateHelper {
 
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   private def isDerivable(
       exprE: Expression,
       exprListR: Seq[Expression],
@@ -1422,6 +1469,10 @@ private object SelectSelectGroupbyChildDelta
         Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
         _ :: Nil,
         _ :: Nil) =>
+        LOGGER.debug(s"Applying pattern: {SelectSelectGroupbyChildDelta} for the plan: "
+
+                     s"{ ${ subsumee.toString().trim } }. " +
+                     s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
+                     s"Compensation: { ${ gb_2c.toString().trim } }")
         val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
         val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => tbl }
         val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
index 7818149..e6d17ae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.Breaks.{break, breakable}
 
+import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference,
Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
@@ -33,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataTypes
 import org.apache.spark.unsafe.types.UTF8String
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
 import org.apache.carbondata.mv.expressions.modular.{ModularSubquery, ScalarModularSubquery}
 import org.apache.carbondata.mv.plans.modular.{ExpressionHelper, GroupBy, HarmonizedRelation,
LeafNode, Matchable, ModularPlan, ModularRelation, Select, SimpleModularizer}
@@ -49,6 +51,8 @@ import org.apache.carbondata.view.{MVCatalogInSpark, MVPlanWrapper, MVTimeGranul
 class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     session: SparkSession) {
 
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   private def getAliasName(expression: NamedExpression): String = {
     expression match {
       case Alias(_, name) => name
@@ -226,6 +230,8 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
    */
   private def rewrite(modularPlan: ModularPlan): ModularPlan = {
     if (modularPlan.find(_.rewritten).isDefined) {
+      LOGGER.debug(s"Getting updated plan for the rewritten modular plan: " +
+                   s"{ ${ modularPlan.toString().trim } }")
       var updatedPlan = modularPlan transform {
         case select: Select =>
           updatePlan(select)
@@ -397,6 +403,7 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
         if (plan.rewritten || !plan.isSPJGH) {
           plan
         } else {
+          LOGGER.info("Query matching has been initiated with available mv schema's")
           val rewrittenPlans =
             for {schemaWrapper <- catalog.lookupFeasibleSchemas(plan).toStream
                  subsumer <- SimpleModularizer.modularize(
@@ -788,82 +795,110 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
         val plan = planWrapper.modularPlan.asInstanceOf[Select]
         val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
         val outputListMapping = groupBy.outputList zip updatedPlanOutputList
-        val outputList = for ((output1, output2) <- outputListMapping) yield {
-          output1 match {
-            case Alias(aggregate@AggregateExpression(function@Sum(_), _, _, _), _) =>
-              val uFun = function.copy(child = output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(function@Max(_), _, _, _), _) =>
-              val uFun = function.copy(child = output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(function@Min(_), _, _, _), _) =>
-              val uFun = function.copy(child = output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@Count(Seq(_)), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(agg@AggregateExpression(_@Corr(_, _), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(agg.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@VariancePop(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@VarianceSamp(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@StddevSamp(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@StddevPop(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@CovPopulation(_, _), _, _, _), _)
=>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@CovSample(_, _), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@Skewness(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case Alias(aggregate@AggregateExpression(_@Kurtosis(_), _, _, _), _) =>
-              val uFun = Sum(output2)
-              Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
-            case _ =>
-              if (output1.name != output2.name) {
-                Alias(output2, output1.name)(exprId = output1.exprId)
-              } else {
-                output2
-              }
-          }
-        }
-        val updatedPredicates = groupBy.predicateList.map {
-          predicate =>
-            outputListMapping.find {
-              case (output1, _) =>
-                output1 match {
-                  case alias: Alias if predicate.isInstanceOf[Alias] =>
-                    alias.child.semanticEquals(predicate.children.head)
-                  case alias: Alias =>
-                    alias.child.semanticEquals(predicate)
-                  case other =>
-                    other.semanticEquals(predicate)
-                }
-            } match {
-              case Some((_, output2)) => output2
-              case _ => predicate
-            }
-        }
+        val (outputList: Seq[NamedExpression], updatedPredicates: Seq[Expression]) =
+          getUpdatedOutputAndPredicateList(
+          groupBy,
+          outputListMapping)
         groupBy.copy(
           outputList = outputList,
           inputList = plan.outputList,
           predicateList = updatedPredicates,
           child = plan,
           modularPlan = None).setRewritten()
+      case groupBy: GroupBy if groupBy.predicateList.nonEmpty => groupBy.child match {
+        case select: Select if select.modularPlan.isDefined =>
+          val planWrapper = select.modularPlan.get.asInstanceOf[MVPlanWrapper]
+          val plan = planWrapper.modularPlan.asInstanceOf[Select]
+          val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, select.modularPlan)
+          val outputListMapping = groupBy.outputList zip updatedPlanOutputList
+          val (outputList: Seq[NamedExpression], updatedPredicates: Seq[Expression]) =
+            getUpdatedOutputAndPredicateList(
+              groupBy,
+              outputListMapping)
+          groupBy.copy(
+            outputList = outputList,
+            inputList = plan.outputList,
+            predicateList = updatedPredicates,
+            child = select,
+            modularPlan = None)
+        case _ => groupBy
+      }
       case other => other
     }
   }
 
+  private def getUpdatedOutputAndPredicateList(groupBy: GroupBy,
+      outputListMapping: Seq[(NamedExpression, NamedExpression)]):
+  (Seq[NamedExpression], Seq[Expression]) = {
+    val outputList = for ((output1, output2) <- outputListMapping) yield {
+      output1 match {
+        case Alias(aggregate@AggregateExpression(function@Sum(_), _, _, _), _) =>
+          val uFun = function.copy(child = output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(function@Max(_), _, _, _), _) =>
+          val uFun = function.copy(child = output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(function@Min(_), _, _, _), _) =>
+          val uFun = function.copy(child = output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@Count(Seq(_)), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(agg@AggregateExpression(_@Corr(_, _), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(agg.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@VariancePop(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@VarianceSamp(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@StddevSamp(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@StddevPop(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@CovPopulation(_, _), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@CovSample(_, _), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@Skewness(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case Alias(aggregate@AggregateExpression(_@Kurtosis(_), _, _, _), _) =>
+          val uFun = Sum(output2)
+          Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
+        case _ =>
+          if (output1.name != output2.name) {
+            Alias(output2, output1.name)(exprId = output1.exprId)
+          } else {
+            output2
+          }
+      }
+    }
+    val updatedPredicates = groupBy.predicateList.map {
+      predicate =>
+        outputListMapping.find {
+          case (output1, _) =>
+            output1 match {
+              case alias: Alias if predicate.isInstanceOf[Alias] =>
+                alias.child.semanticEquals(predicate.children.head)
+              case alias: Alias =>
+                alias.child.semanticEquals(predicate)
+              case other =>
+                other.semanticEquals(predicate)
+            }
+        } match {
+          case Some((_, output2)) => output2
+          case _ => predicate
+        }
+    }
+    (outputList, updatedPredicates)
+  }
+
   /**
    * Updates the flagspec of given select plan with attributes of relation select plan
    */
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index c288d40..10b793b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.optimizer
 
 import scala.collection.JavaConverters._
 
+import org.apache.log4j.Logger
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
@@ -29,7 +30,6 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.core.view.{MVCatalog, MVCatalogFactory}
 import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
 import org.apache.carbondata.view.{MVCatalogInSpark, MVManagerInSpark, MVSchemaWrapper}
 import org.apache.carbondata.view.MVFunctions.DUMMY_FUNCTION
@@ -39,6 +39,8 @@ import org.apache.carbondata.view.MVFunctions.DUMMY_FUNCTION
  */
 class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
 
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
   override def apply(logicalPlan: LogicalPlan): LogicalPlan = {
     // only query need to check this rule
     logicalPlan match {
@@ -51,7 +53,11 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     } catch {
       case e =>
         // if exception is thrown while rewriting the query, will fallback to original query
plan.
-        MVRewriteRule.LOGGER.warn("Failed to rewrite plan with mv: " + e.getMessage)
+        MVRewriteRule.LOGGER
+          .warn("Failed to rewrite plan with mv. Enable debug log to check the Exception")
+        if (MVRewriteRule.LOGGER.isDebugEnabled) {
+          MVRewriteRule.LOGGER.debug(e.getMessage)
+        }
         logicalPlan
     }
   }
@@ -88,6 +94,9 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
           canApply = false
         }
         Aggregate(groupBy, aggregations, child)
+      case localRelation@LocalRelation(_, _, _) =>
+        canApply = false
+        localRelation
     }
     if (!canApply) {
       return logicalPlan
@@ -97,6 +106,8 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     }
     val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
     if (viewCatalog != null && hasSuitableMV(logicalPlan, viewCatalog)) {
+      LOGGER.debug(s"Query Rewrite has been initiated for the plan: " +
+                   s"${ logicalPlan.toString().trim }")
       val viewRewrite = new MVRewrite(viewCatalog, logicalPlan, session)
       val rewrittenPlan = viewRewrite.rewrittenPlan
       if (rewrittenPlan.find(_.rewritten).isDefined) {
@@ -131,7 +142,7 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
    */
   private def rewriteFunctionWithQualifierName(modularPlan: ModularPlan): String = {
     val compactSQL = modularPlan.asCompactSQL
-    modularPlan match {
+    val finalCompactSQL = modularPlan match {
       case select: Select =>
         var outputColumn = ""
         select.outputList.collect {
@@ -154,6 +165,8 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
       case _ =>
         compactSQL
     }
+    LOGGER.debug(s"Rewritten Query: { ${finalCompactSQL.trim} }")
+    finalCompactSQL
   }
 
   /**
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index 891e36d..a4aa1c5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -414,6 +414,40 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop materialized view mv32")
   }
 
+  test("test create materialized view having sub-query alias used in projection") {
+    sql("drop materialized view if exists mv_sub")
+    val subQuery = "select empname, sum(result) sum_ut from " +
+                   "(select empname, utilization result from fact_table1) fact_table1 " +
+                   "group by empname"
+    sql("create materialized view mv_sub as " + subQuery)
+    val frame = sql(subQuery)
+    assert(TestUtil.verifyMVHit(frame.queryExecution.optimizedPlan, "mv_sub"))
+    checkAnswer(frame,
+      sql("select empname, sum(result) ut from (select empname, utilization result from "
+
+          "fact_table2) fact_table2 group by empname"))
+    sql(s"drop materialized view mv_sub")
+  }
+
+  test("test create materialized view used as sub-query in actual query") {
+    sql("drop materialized view if exists mv_sub")
+    sql("create materialized view mv_sub as select empname, utilization result from fact_table1")
+    val df1 = sql("select empname, sum(result) sum_ut from " +
+                  "(select empname, utilization result from fact_table1) fact_table1 " +
+                  "group by empname")
+    val df2 = sql("select emp, sum(result) sum_ut from " +
+                  "(select empname emp, utilization result from fact_table1) fact_table1
" +
+                  "group by emp")
+    assert(TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "mv_sub"))
+    assert(TestUtil.verifyMVHit(df2.queryExecution.optimizedPlan, "mv_sub"))
+    checkAnswer(df1,
+      sql("select empname, sum(result) ut from (select empname, utilization result from "
+
+          "fact_table2) fact_table2 group by empname"))
+    checkAnswer(df2,
+      sql("select emp, sum(result) ut from (select empname emp, utilization result from "
+
+          "fact_table2) fact_table2 group by emp"))
+    sql(s"drop materialized view mv_sub")
+  }
+
   test("test create materialized view with simple and sub group by query with filter on materialized
view") {
     sql("create materialized view mv15 as select empname, sum(utilization) from fact_table1
where empname='shivani' group by empname")
     val frame = sql(


Mime
View raw message