carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3291] Fix that MV datamap doesn't take affect when the same table join
Date Sun, 19 May 2019 15:56:26 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0482983  [CARBONDATA-3291] Fix that MV datamap doesn't take affect when the same
table join
0482983 is described below

commit 04829839452d7f56954219b56a6e515239effe61
Author: qiuchenjian <807169000@qq.com>
AuthorDate: Wed Feb 13 20:32:42 2019 +0800

    [CARBONDATA-3291] Fix that MV datamap doesn't take affect when the same table join
    
    [Problem]
    MV datamap doesn't take affect when the same table join
    the error scene see the test case
    
    This closes #3125
---
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  | 15 +++-
 .../apache/carbondata/mv/rewrite/Navigator.scala   | 51 +++++++++---
 .../mv/rewrite/MVMultiJoinTestCase.scala           | 94 ++++++++++++++++++++++
 .../mv/plans/modular/ModularRelation.scala         | 15 ++++
 4 files changed, 160 insertions(+), 15 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index cc5cc7b..59d72f8 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -162,8 +162,14 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
         // are 1-1 correspondence.
         // Change the following two conditions to more complicated ones if we want to
         // consider things that combine extrajoin, rejoin, and harmonized relations
-        val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ ==
x) != 1 }
-        val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ ==
x) != 1 }
+        val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{
+          case relation: ModularRelation => relation.fineEquals(x)
+          case other => other == x
+        } != 1 }
+        val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{
+          case relation: ModularRelation => relation.fineEquals(x)
+          case other => other == x
+        } != 1 }
 
         val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child)
}
         val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child)
}
@@ -180,7 +186,10 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
             isPredicateEmdR && isOutputEdR) {
           val mappings = sel_1a.children.zipWithIndex.map {
             case (childr, fromIdx) if sel_1q.children.contains(childr) =>
-              val toIndx = sel_1q.children.indexWhere(_ == childr)
+              val toIndx = sel_1q.children.indexWhere{
+                case relation: ModularRelation => relation.fineEquals(childr)
+                case other => other == childr
+              }
               (toIndx -> fromIdx)
 
           }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index 76df4c2..905cd17 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -17,11 +17,11 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
 
 import org.apache.carbondata.mv.expressions.modular._
-import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
 import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular._
 import org.apache.carbondata.mv.session.MVSession
 
 private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) {
@@ -146,21 +146,27 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session:
MVSession)
     val rtables = subsumer.collect { case n: modular.LeafNode => n }
     val etables = subsumee.collect { case n: modular.LeafNode => n }
     val pairs = for {
-      rtable <- rtables
-      etable <- etables
-      if rtable == etable
-    } yield (rtable, etable)
+      i <- rtables.indices
+      j <- etables.indices
+      if rtables(i) == etables(j) && reTablesJoinMatched(
+        rtables(i), etables(j), subsumer, subsumee, i, j
+      )
+    } yield (rtables(i), etables(j))
 
     pairs.foldLeft(subsumer) {
       case (curSubsumer, pair) =>
         val mappedOperator =
-          if (pair._1.isInstanceOf[modular.HarmonizedRelation] &&
-              pair._1.asInstanceOf[modular.HarmonizedRelation].hasTag) {
-          pair._2.asInstanceOf[modular.HarmonizedRelation].addTag
-        } else {
-          pair._2
+          pair._1 match {
+            case relation: HarmonizedRelation if relation.hasTag =>
+              pair._2.asInstanceOf[HarmonizedRelation].addTag
+            case _ =>
+              pair._2
+          }
+        val nxtSubsumer = curSubsumer.transform {
+          case node: ModularRelation if node.fineEquals(pair._1) => mappedOperator
+          case pair._1 if !pair._1.isInstanceOf[ModularRelation] => mappedOperator
         }
-        val nxtSubsumer = curSubsumer.transform { case pair._1 => mappedOperator }
+
         // val attributeSet = AttributeSet(pair._1.output)
         // reverse first due to possible tag for left join
         val rewrites = AttributeMap(pair._1.output.zip(mappedOperator.output))
@@ -171,4 +177,25 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session:
MVSession)
         }
     }
   }
+
+  // match the join table of subsumer and subsumee
+  // when the table names are the same
+  def reTablesJoinMatched(rtable: modular.LeafNode, etable: modular.LeafNode,
+                          subsumer: ModularPlan, subsumee: ModularPlan,
+                          rIndex: Int, eIndex: Int): Boolean = {
+    (rtable, etable) match {
+      case _: (ModularRelation, ModularRelation) =>
+        val rtableParent = subsumer.find(p => p.children.contains(rtable)).get
+        val etableParent = subsumee.find(p => p.children.contains(etable)).get
+        (rtableParent, etableParent) match {
+          case  (e: Select, r: Select) =>
+            val intersetJoinEdges = r.joinEdges intersect e.joinEdges
+            if (intersetJoinEdges.nonEmpty) {
+              return intersetJoinEdges.exists(j => j.left == rIndex && j.left
== eIndex ||
+                j.right == rIndex && j.right == eIndex)
+            }
+        }
+    }
+    true
+  }
 }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
new file mode 100644
index 0000000..bfd621d
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(){
+    drop
+    sql("create table dim_table(name string,age int,height int) using carbondata")
+    sql("create table sdr_table(name varchar(20), score int) using carbondata")
+    sql("create table areas(aid int, title string, pid int) using carbondata")
+  }
+
+  override def afterAll(){
+    drop
+  }
+
+  test("test mv self join") {
+    sql("insert into areas select 130000, 'hebei', null")
+    sql("insert into areas select 130100, 'shijiazhuang', 130000")
+    sql("insert into areas select 130400, 'handan', 130000")
+    sql("insert into areas select 410000, 'henan', null")
+    sql("insert into areas select 410300, 'luoyang', 410000")
+
+    val mvSQL =
+      s"""select p.title,c.title
+         |from areas as p
+         |inner join areas as c on c.pid=p.aid
+         |where p.title = 'hebei'
+       """.stripMargin
+    sql("create datamap table_mv using 'mv' as " + mvSQL)
+    sql("rebuild datamap table_mv")
+    val frame = sql(mvSQL)
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
+    checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan")))
+  }
+
+  test("test mv two join tables are same") {
+    sql("drop datamap if exists table_mv")
+
+    sql("insert into dim_table select 'tom',20,170")
+    sql("insert into dim_table select 'lily',30,160")
+    sql("insert into sdr_table select 'tom',70")
+    sql("insert into sdr_table select 'tom',50")
+    sql("insert into sdr_table select 'lily',80")
+
+    val mvSQL =
+      s"""select sdr.name,sum(sdr.score),dim.age,dim_other.height from sdr_table sdr
+         | left join dim_table dim on sdr.name = dim.name
+         | left join dim_table dim_other on sdr.name = dim_other.name
+         | group by sdr.name,dim.age,dim_other.height
+       """.stripMargin
+    sql("create datamap table_mv using 'mv' as " + mvSQL)
+    sql("rebuild datamap table_mv")
+    val frame = sql(mvSQL)
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
+    checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170)))
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+  def drop: Unit ={
+    sql("drop table if exists areas")
+    sql("drop table if exists dim_table")
+    sql("drop table if exists sdr_table")
+    sql("drop datamap if exists table_mv")
+  }
+
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index 7e1eb05..491d394 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -61,6 +61,21 @@ case class ModularRelation(databaseName: String,
 
   override def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty
 
+  def fineEquals(that: Any): Boolean = {
+    that match {
+      case that: ModularRelation =>
+        if ((databaseName != null && tableName != null && databaseName ==
that.databaseName &&
+          tableName == that.tableName && output.toString == that.output.toString)
||
+          (databaseName == null && tableName == null && that.databaseName
== null &&
+            that.tableName == null && output.toString == that.output.toString)) {
+          true
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+
   override def equals(that: Any): Boolean = {
     that match {
       case that: ModularRelation =>


Mime
View raw message