carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2686] Implement Left join on MV datamap
Date Fri, 06 Jul 2018 13:42:22 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master cb10d03a7 -> 21b56dfd8


[CARBONDATA-2686] Implement Left join on MV datamap

This closes #2444


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/21b56dfd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/21b56dfd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/21b56dfd

Branch: refs/heads/master
Commit: 21b56dfd84cb7ad207b8f0122adbed2ffdbb3e72
Parents: cb10d03
Author: eychih <eychih@hotmail.com>
Authored: Thu May 31 19:10:34 2018 -0700
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Jul 6 21:42:08 2018 +0800

----------------------------------------------------------------------
 .../mv/rewrite/DefaultMatchMaker.scala          | 62 ++++++++++-----
 .../carbondata/mv/rewrite/Navigator.scala       | 20 +++--
 .../mv/rewrite/SummaryDatasetCatalog.scala      | 19 +++--
 .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 10 +--
 .../rewrite/matching/TestTPCDS_1_4_Batch.scala  | 64 ++++++++++++++-
 .../mv/plans/modular/Harmonizer.scala           | 30 +++++--
 .../mv/plans/modular/ModularPlan.scala          |  4 +-
 .../mv/plans/modular/ModularRelation.scala      | 82 +++++++++++++++++---
 .../mv/testutil/Tpcds_1_4_QueryBatch.scala      | 43 ++++++++++
 .../mv/testutil/Tpcds_1_4_Tables.scala          | 14 ++++
 10 files changed, 292 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 6dbf236..6bb1c6e 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -19,11 +19,10 @@
 package org.apache.carbondata.mv.rewrite
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference,
AttributeSet, Expression, PredicateHelper, _}
-import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
 
-import org.apache.carbondata.mv.datamap.MVHelper
+import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
 import org.apache.carbondata.mv.plans.modular
-import org.apache.carbondata.mv.plans.modular._
 import org.apache.carbondata.mv.plans.modular.Flags._
 import org.apache.carbondata.mv.plans.util.SQLBuilder
 
@@ -127,8 +126,22 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
     }
   }
 
-  def apply(
-      subsumer: ModularPlan,
+  private def isLeftJoinView(subsumer: ModularPlan): Boolean = {
+    subsumer match {
+      case sel: Select
+        if sel.joinEdges.length == 1 &&
+           sel.joinEdges.head.joinType == LeftOuter &&
+           sel.children(1).isInstanceOf[HarmonizedRelation] =>
+        val hDim = sel.children(1).asInstanceOf[HarmonizedRelation]
+        hDim.tag match {
+          case Some(tag) => sel.outputList.contains(tag)
+          case None => false
+        }
+      case _ => false
+    }
+  }
+
+  def apply(subsumer: ModularPlan,
       subsumee: ModularPlan,
       compensation: Option[ModularPlan],
       rewrite: QueryRewrite): Seq[ModularPlan] = {
@@ -173,11 +186,15 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
                 case (Some(l), Some(r)) =>
                   val mappedEdge = JoinEdge(l, r, x.joinType)
                   val joinTypeEquivalent =
-                    if (sel_1q.joinEdges.contains(mappedEdge)) true
-                    else {
+                    if (sel_1q.joinEdges.contains(mappedEdge)) {
+                      true
+                    } else {
                       x.joinType match {
                         case Inner | FullOuter =>
                           sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
+                        case LeftOuter if isLeftJoinView(sel_1a) =>
+                          sel_1q.joinEdges.contains(JoinEdge(l, r, Inner)) ||
+                          sel_1q.joinEdges.contains(JoinEdge(r, l, Inner))
                         case _ => false
                       }
                     }
@@ -201,11 +218,13 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
             sel_1a.outputList.exists(_.semanticEquals(expr)))
           val isOutputRmE = sel_1a.outputList.forall(expr =>
             sel_1q.outputList.exists(_.semanticEquals(expr)))
+          val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType
== Inner)
 
           if (r2eJoinsMatch) {
-            if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty)
{
-              Seq(sel_1a) // no compensation needed
+            if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty
&& isLOEmLOR) {
+              Seq(sel_1a)
             } else {
+              // no compensation needed
               val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
               val tAliasMap = new collection.mutable.HashMap[Int, String]()
 
@@ -245,15 +264,22 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
                   }
               }
               val tPredicateList = sel_1q.predicateList.filter { p =>
-                !sel_1a.predicateList.exists(_.semanticEquals(p)) }
-                val wip = sel_1q.copy(
-                  predicateList = tPredicateList,
-                  children = tChildren,
-                  joinEdges = tJoinEdges.filter(_ != null),
-                  aliasMap = tAliasMap.toMap)
-
-                val done = factorOutSubsumer(wip, usel_1a, wip.aliasMap)
-                Seq(done)
+                !sel_1a.predicateList.exists(_.semanticEquals(p))
+              } ++ (if (isLeftJoinView(sel_1a) &&
+                        sel_1q.joinEdges.head.joinType == Inner) {
+                sel_1a.children(1)
+                  .asInstanceOf[HarmonizedRelation].tag.map(IsNotNull(_)).toSeq
+              } else {
+                Seq.empty
+              })
+              val sel_1q_temp = sel_1q.copy(
+                predicateList = tPredicateList,
+                children = tChildren,
+                joinEdges = tJoinEdges.filter(_ != null),
+                aliasMap = tAliasMap.toMap)
+
+              val done = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
+              Seq(done)
             }
           } else Nil
         } else Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index a36988a..5dc2245 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -50,7 +50,7 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
           val compensation =
             (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
                    subsumer <- session.sessionState.modularizer.modularize(
-                     session.sessionState.optimizer.execute(dataset.plan)) // .map(_.harmonized)
+                     session.sessionState.optimizer.execute(dataset.plan)).map(_.semiHarmonized)
                    subsumee <- unifySubsumee(currentFragment)
                    comp <- subsume(
                      unifySubsumer2(
@@ -168,17 +168,25 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session:
MVSession)
     val pairs = for {
       rtable <- rtables
       etable <- etables
-      if (rtable == etable)
+      if rtable == etable
     } yield (rtable, etable)
 
     pairs.foldLeft(subsumer) {
       case (curSubsumer, pair) =>
-        val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 }
-        val attributeSet = AttributeSet(pair._1.output)
-        val rewrites = AttributeMap(pair._1.output.zip(pair._2.output))
+        val mappedOperator =
+          if (pair._1.isInstanceOf[modular.HarmonizedRelation] &&
+              pair._1.asInstanceOf[modular.HarmonizedRelation].hasTag) {
+          pair._2.asInstanceOf[modular.HarmonizedRelation].addTag
+        } else {
+          pair._2
+        }
+        val nxtSubsumer = curSubsumer.transform { case pair._1 => mappedOperator }
+        // val attributeSet = AttributeSet(pair._1.output)
+        // reverse first due to possible tag for left join
+        val rewrites = AttributeMap(pair._1.output.zip(mappedOperator.output))
         nxtSubsumer.transformUp {
           case p => p.transformExpressions {
-            case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier)
+            case a: Attribute if rewrites contains a => rewrites(a).withQualifier(a.qualifier)
           }
         }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 3b5930f..a7dbdef 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -101,7 +101,7 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
       val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed)
       val modularPlan =
         mvSession.sessionState.modularizer.modularize(
-          mvSession.sessionState.optimizer.execute(planToRegister)).next().harmonized
+          mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized
       val signature = modularPlan.signature
       val identifier = dataMapSchema.getRelationIdentifier
       val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
@@ -141,6 +141,8 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
   override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray
 
   /**
+   * API for test only
+   *
    * Registers the data produced by the logical representation of the given [[DataFrame]].
Unlike
    * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
    * the in-memory columnar representation of the underlying table is expensive.
@@ -155,8 +157,7 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
       } else {
         val modularPlan =
           mvSession.sessionState.modularizer.modularize(
-            mvSession.sessionState.optimizer.execute(planToRegister)).next()
-        // .harmonized
+            mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized
         val signature = modularPlan.signature
         summaryDatasets +=
         SummaryDataset(signature, planToRegister, null, null)
@@ -174,8 +175,10 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
     }
   }
 
-  /** Tries to remove the data set for the given [[DataFrame]] from the catalog if it's
-   * registered */
+  /**
+   * API for test only
+   * Tries to remove the data set for the given [[DataFrame]] from the catalog if it's registered
+   */
   private[mv] def tryUnregisterSummaryDataset(
       query: DataFrame,
       blocking: Boolean = true): Boolean = {
@@ -214,7 +217,11 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
       val enabledDataSets = summaryDatasets.filter { p =>
         statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName))
       }
-      val feasible = enabledDataSets.filter { x =>
+
+      //  ****not sure what enabledDataSets is used for ****
+      //  can enable/disable datamap move to other place ?
+      //    val feasible = enabledDataSets.filter { x =>
+      val feasible = summaryDatasets.filter { x =>
         (x.signature, sig) match {
           case (Some(sig1), Some(sig2)) =>
             if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets))
{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
index 76e0455..882a43a 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.mv.testutil.ModularPlanTest
 //import org.apache.spark.sql.catalyst.SQLBuilder
 import java.io.{File, PrintWriter}
 
-class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter { 
+class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
   import org.apache.carbondata.mv.rewrite.matching.TestTPCDS_1_4_Batch._
   import org.apache.carbondata.mv.testutil.Tpcds_1_4_Tables._
 
@@ -33,7 +33,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
   val testHive = sqlContext.sparkSession
   val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient()
 
-  ignore("test using tpc-ds queries") {
+  test("test using tpc-ds queries") {
 
     tpcds1_4Tables.foreach { create_table =>
       hiveClient.runSqlHive(create_table)
@@ -45,10 +45,10 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
 //    val dest = "case_33"
 // case_15 and case_16 need revisit
 
-    val dest = "case_29"   /** to run single case, uncomment out this **/
+    val dest = "case_39"   /** to run single case, uncomment out this **/
     
     tpcds_1_4_testCases.foreach { testcase =>
-//      if (testcase._1 == dest) { /** to run single case, uncomment out this **/
+      if (testcase._1 == dest) { /** to run single case, uncomment out this **/
         val mvSession = new SummaryDatasetCatalog(testHive)
         val summaryDF = testHive.sql(testcase._2)
         mvSession.registerSummaryDataset(summaryDF)
@@ -75,7 +75,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter {
           writer.print(s"\n\n==== rewritten query ====\n\n${rewriteSQL}\n")
         }
 
-//        }  /**to run single case, uncomment out this **/
+        }  /**to run single case, uncomment out this **/
     
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestTPCDS_1_4_Batch.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestTPCDS_1_4_Batch.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestTPCDS_1_4_Batch.scala
index e564052..23b1554 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestTPCDS_1_4_Batch.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestTPCDS_1_4_Batch.scala
@@ -2836,7 +2836,69 @@ object TestTPCDS_1_4_Batch {
         |    GROUP BY gen_subquery_2.`a3600`, gen_subquery_2.`a12575873557`, gen_subquery_2.`a12575847251`,
gen_subquery_2.`a12575903189`, gen_subquery_2.`a204010101`) gen_subquery_3 
         |  GROUP BY gen_subquery_3.`a3600`, gen_subquery_3.`a12575873557`, gen_subquery_3.`a12575847251`,
gen_subquery_3.`a12575903189`) gen_subquery_4 
         |ORDER BY gen_subquery_4.`START_TIME` ASC NULLS FIRST
+        """.stripMargin.trim),
+      ("case_38",
+       """
+        |SELECT p.id, SUM(s.ss_sales_price) sum, p.prod_name, s.ss_ext_list_price
+        |FROM store_sales s
+        |LEFT JOIN (
+        |  SELECT 1 id, i_item_id, FIRST(i_product_name) prod_name
+        |  FROM item 
+        |  GROUP BY i_item_id) p
+        |ON s.ss_item_sk = p.i_item_id
+        |GROUP BY p.id, p.prod_name, s.ss_ext_list_price
+        """.stripMargin.trim,
+       """
+        |SELECT SUM(s.ss_sales_price), p.prod_name
+        |FROM store_sales s
+        |LEFT JOIN (
+        |  SELECT i_item_id, FIRST(i_product_name) prod_name
+        |  FROM item 
+        |  GROUP BY i_item_id) p
+        |ON s.ss_item_sk = p.i_item_id
+        |WHERE s.ss_ext_list_price > 20.0
+        |GROUP BY p.prod_name
+        """.stripMargin.trim,
+       """ 
+        |
+        |
+        """.stripMargin.trim),
+      ("case_39",
+       """
+        |SELECT p.gen_tag_id, SUM(s.ss_sales_price) sum, p.prod_name, p.prod_size
+        |FROM store_sales s
+        |LEFT JOIN (
+        |  SELECT 1 gen_tag_id, i_item_id, FIRST(i_product_name) prod_name, FIRST(i_size)
prod_size
+        |  FROM item 
+        |  GROUP BY i_item_id) p
+        |ON s.ss_item_sk = p.i_item_id
+        |GROUP BY p.gen_tag_id, p.prod_name, p.prod_size
+        """.stripMargin.trim,
+       """
+        |SELECT SUM(s.ss_sales_price), p.prod_name
+        |FROM store_sales s
+        |INNER JOIN (
+        |  SELECT i_item_id, FIRST(i_product_name) prod_name, FIRST(i_size) prod_size
+        |  FROM item 
+        |  GROUP BY i_item_id) p
+        |ON s.ss_item_sk = p.i_item_id
+        |WHERE p.prod_size = 'small'
+        |GROUP BY p.prod_name
+        """.stripMargin.trim,
+       """ 
+        |SELECT sum(gen_subsumer_0.`sum`) AS `sum(ss_sales_price)`, gen_subsumer_0.`prod_name`

+        |FROM
+        |  (SELECT p.`gen_tag_id`, sum(s.`ss_sales_price`) AS `sum`, p.`prod_name`, p.`prod_size`

+        |  FROM
+        |    store_sales s 
+        |    LEFT OUTER JOIN (SELECT 1 AS `gen_tag_id`, item.`i_item_id`, first(item.`i_product_name`,
false) AS `prod_name`, first(item.`i_size`, false) AS `prod_size` 
+        |    FROM
+        |      item
+        |    GROUP BY item.`i_item_id`) p  ON (s.`ss_item_sk` = CAST(p.`i_item_id` AS INT))
+        |  GROUP BY p.`gen_tag_id`, p.`prod_name`, p.`prod_size`) gen_subsumer_0 
+        |WHERE
+        |  (gen_subsumer_0.`prod_size` = 'small') AND (gen_subsumer_0.`gen_tag_id` IS NOT
NULL)
+        |GROUP BY gen_subsumer_0.`prod_name`
         """.stripMargin.trim)
   )
-  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
index 91201a1..ebe8c8c 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -36,17 +36,32 @@ abstract class Harmonizer(conf: SQLConf)
   def batches: Seq[Batch] = {
     Batch(
       "Data Harmonizations", fixedPoint,
-      HarmonizeDimensionTable,
-      HarmonizeFactTable) :: Nil
+      Seq( HarmonizeDimensionTable) ++
+      extendedOperatorHarmonizationRules: _*) :: Nil
   }
+
+  /**
+   * Override to provide additional rules for the modular operator harmonization batch.
+   */
+  def extendedOperatorHarmonizationRules: Seq[Rule[ModularPlan]] = Nil
+}
+
+/**
+ * A full Harmonizer - harmonize both fact and dimension tables
+ */
+object FullHarmonizer extends FullHarmonizer
+
+class FullHarmonizer extends Harmonizer(new SQLConf()) {
+  override def extendedOperatorHarmonizationRules: Seq[Rule[ModularPlan]] =
+    super.extendedOperatorHarmonizationRules ++ (HarmonizeFactTable :: Nil)
 }
 
 /**
- * An default Harmonizer
+ * A semi Harmonizer - it harmonizes dimension tables only
  */
-object DefaultHarmonizer extends DefaultHarmonizer
+object SemiHarmonizer extends SemiHarmonizer
 
-class DefaultHarmonizer extends Harmonizer(new SQLConf())
+class SemiHarmonizer extends Harmonizer(new SQLConf())
 
 object HarmonizeDimensionTable extends Rule[ModularPlan] with PredicateHelper {
 
@@ -68,8 +83,9 @@ object HarmonizeDimensionTable extends Rule[ModularPlan] with PredicateHelper
{
           s1@Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil, _),
           NoFlags,
           Nil, _) if (dim.isInstanceOf[ModularRelation]) => {
-            val rAliasMap = AttributeMap(h.outputList
-              .collect { case a: Alias => (a.child.asInstanceOf[Attribute], a.toAttribute)
})
+            val rAliasMap = AttributeMap(h.outputList.collect {
+                case a: Alias  if a.child.isInstanceOf[Attribute] =>
+                (a.child.asInstanceOf[Attribute], a.toAttribute) })
             val pullUpPredicates = s1.predicateList
               .map(replaceAlias(_, rAliasMap.asInstanceOf[AttributeMap[Expression]]))
             if (pullUpPredicates.forall(cond => canEvaluate(cond, h))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index c66df19..1420522 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -166,7 +166,9 @@ abstract class ModularPlan
    *
    * Some nodes should overwrite this to provide proper harmonization logic.
    */
-  lazy val harmonized: ModularPlan = DefaultHarmonizer.execute(preHarmonized)
+  lazy val harmonized: ModularPlan = FullHarmonizer.execute(preHarmonized)
+
+  lazy val semiHarmonized: ModularPlan = SemiHarmonizer.execute(preHarmonized)
 
   /**
    * Do some simple transformation on this plan before harmonizing. Implementations can override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index dec0c48..3349835 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -18,7 +18,8 @@
 package org.apache.carbondata.mv.plans.modular
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference,
Expression, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
@@ -85,11 +86,27 @@ object HarmonizedRelation {
       _,
       Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil, _),
       NoFlags,
-      Nil, _) if (dim.isInstanceOf[ModularRelation]) =>
-        if (g.outputList
-          .forall(col => col.isInstanceOf[AttributeReference] ||
-                         (col.isInstanceOf[Alias] &&
-                          col.asInstanceOf[Alias].child.isInstanceOf[AttributeReference])))
{
+      Nil, _) if dim.isInstanceOf[ModularRelation] =>
+        val check =
+          g.outputList.forall {
+            case alias: Alias =>
+              alias.child.isInstanceOf[AttributeReference] ||
+              alias.child.isInstanceOf[Literal] ||
+              (alias.child match {
+                case AggregateExpression(First(_, _), _, _, _) => true
+                case AggregateExpression(Last(_, _), _, _, _) => true
+                case _ => false
+              })
+            case col =>
+              col.isInstanceOf[AttributeReference] ||
+              col.isInstanceOf[Literal] ||
+              (col.asInstanceOf[Expression] match {
+                case AggregateExpression(First(_, _), _, _, _) => true
+                case AggregateExpression(Last(_, _), _, _, _) => true
+                case _ => false
+              })
+        }
+        if (check) {
           true
         } else {
           false
@@ -106,6 +123,14 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
     .tableName
   lazy val databaseName = source.asInstanceOf[GroupBy].child.children(0)
     .asInstanceOf[ModularRelation].databaseName
+  lazy val tag: Option[Attribute] = {
+    source match {
+      case GroupBy(head :: tail, _, _, _, _, _, _, _)
+        if (head.isInstanceOf[Alias] && head.asInstanceOf[Alias].child == Literal(1))
=>
+        Some(head.toAttribute)
+      case _ => None
+    }
+  }
 
   //  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = source.stats
   // (spark, conf)
@@ -117,15 +142,19 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode
{
     val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
       table => AttributeMap(table.output.zip(output))
     }
-    val rewrites = mapSeq(0)
+    val rewrites = mapSeq.head
     val aliasMap = AttributeMap(
       source.asInstanceOf[GroupBy].outputList.collect {
-        case a: Alias if (a.child.isInstanceOf[Attribute]) => (a.child.asInstanceOf[Attribute],
a
-          .toAttribute)
+        case a@Alias(ar: Attribute, _) => (ar, a.toAttribute)
+        case a@Alias(AggregateExpression(First(ar: Attribute, _), _, _, _), _) =>
+          (ar, a.toAttribute)
+        case a@Alias(AggregateExpression(Last(ar: Attribute, _), _, _, _), _) =>
+          (ar, a.toAttribute)
       })
     val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1),
pair._2) }
-    val attributeStats = AttributeMap(aStatsIterator
-      .map(pair => ((aliasMap.get(pair._1)).getOrElse(pair._1), pair._2)).toSeq)
+    val attributeStats =
+      AttributeMap(
+        aStatsIterator.map(pair => (aliasMap.get(pair._1).getOrElse(pair._1), pair._2)).toSeq)
 
     Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
   }
@@ -135,9 +164,38 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
   // two harmonized modular relations are equal only if orders of output columns of
   // their source plans are exactly the same
   override def equals(that: Any): Boolean = {
+    def canonicalize(source: ModularPlan): ModularPlan = {
+      source.transform {
+        case gb@GroupBy(head :: tail, _, _, _, _, _, _, _)
+          if head.isInstanceOf[Alias] && head.asInstanceOf[Alias].child == Literal(1)
=>
+          gb.copy(outputList = tail)
+      }
+    }
+
     that match {
-      case that: HarmonizedRelation => source.sameResult(that.source)
+      case that: HarmonizedRelation =>
+        val s1 = canonicalize(source)
+        val s2 = canonicalize(that.source)
+        val r = s1.sameResult(s2)
+        r
       case _ => false
     }
   }
+
+  def addTag: HarmonizedRelation = {
+    val source1 = source.transform { case gb@GroupBy(head :: tail, _, _, _, _, _, _, _)
+      if !head.isInstanceOf[Alias] || !(head.asInstanceOf[Alias].child == Literal(1)) =>
+      val exprId = NamedExpression.newExprId
+      gb.copy(outputList = Alias(Literal(1), s"gen_tag_${ exprId.id }")(exprId) +: gb.outputList)
+    }
+    HarmonizedRelation(source1)
+  }
+
+  def hasTag: Boolean = {
+    tag match {
+      case Some(_) => true
+      case None => false
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_QueryBatch.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_QueryBatch.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_QueryBatch.scala
index 8262dfa..e6c0223 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_QueryBatch.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_QueryBatch.scala
@@ -4269,6 +4269,49 @@ object Tpcds_1_4_QueryBatch {
         | GROUP BY channel, d_year, d_qoy, i_category
         | ORDER BY channel, d_year, d_qoy, i_category
         | limit 100
+      """.stripMargin),
+    ("qJoin",
+      """
+        |    SELECT
+        |        i_category,
+        |        ss_ext_sales_price ext_sales_price
+        |    FROM store_sales, item
+        |    WHERE ss_item_sk=i_item_sk
+      """.stripMargin),
+    ("qJoin1",
+      """
+        |  SELECT gen_subsumer_0.`i_category`
+        |  FROM
+        |    (SELECT
+        |         item.`i_category`,
+        |         store_sales.`ss_ext_sales_price` ext_sales_price
+        |     FROM store_sales, item
+        |     WHERE store_sales.`ss_item_sk`=item.`i_item_sk`) gen_subsumer_0
+      """.stripMargin),
+    ("qCarbon",
+      """
+        |SELECT gen_subsumer_0.`c1` AS `c1`, gen_subsumer_0.`designation`
+        |FROM
+        |  (SELECT t1.`empname` AS `c1`, t2.`designation`, t2.`empname` AS `c2`
+        |  FROM
+        |    fact_table1 t1
+        |    INNER JOIN fact_table2 t2  ON (t1.`empname` = t2.`empname`)) gen_subsumer_0
+      """.stripMargin),
+    ("qCarbon1",
+      """
+        |  SELECT t1.`empname` AS `c1`, t2.`designation`, t2.`empname` AS `c2`
+        |  FROM
+        |    fact_table1 t1
+        |    INNER JOIN fact_table2 t2  ON (t1.`empname` = t2.`empname`)
+      """.stripMargin),
+    ("qLeftJoin",
+      """
+        |    SELECT
+        |        i_category,
+        |        ss_ext_sales_price ext_sales_price
+        |    FROM store_sales
+        |         Left Join item
+        |         ON ss_item_sk=i_item_sk
       """.stripMargin)
   )
   // .map { case (name, sqlText) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/21b56dfd/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
index 175b319..01ca448 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala
@@ -840,6 +840,20 @@ object Tpcds_1_4_Tables {
        | newdate       string   ,
        | minnewdate    string   )
        |STORED AS parquet
+          """.stripMargin.trim,
+    s"""
+       | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp,
+       |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+       |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+       |  utilization int,salary int)
+       |STORED AS parquet
+          """.stripMargin.trim,
+    s"""
+       | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp,
+       |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+       |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+       |  utilization int,salary int)
+       |STORED AS parquet
           """.stripMargin.trim
   )
 }


Mime
View raw message