carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3295] Fix that MV datamap throw exception because its rewrite algorithm when query SQL has multiply subquery
Date Sun, 19 May 2019 16:06:32 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 789c97e  [CARBONDATA-3295] Fix that MV datamap throw exception because its rewrite
algorithm when query SQL has multiply subquery
789c97e is described below

commit 789c97e4196edbe454bd83730b36cd21f72ce0cd
Author: qiuchenjian <807169000@qq.com>
AuthorDate: Sun Feb 17 18:52:09 2019 +0800

    [CARBONDATA-3295] Fix that MV datamap throw exception because its rewrite algorithm when
query SQL has multiply subquery
    
    [Problem]
    Error:
    
    java.lang.UnsupportedOperationException was thrown.
    java.lang.UnsupportedOperationException
    	at org.apache.carbondata.mv.plans.util.SQLBuildDSL.productArity(SQLBuildDSL.scala:36)
    	at scala.runtime.ScalaRunTime12409anon.<init>(ScalaRunTime.scala:174)
    	at scala.runtime.ScalaRunTime$.typedProductIterator(ScalaRunTime.scala:172)create datamap
data_table_mv using 'mv' as
     SELECT STARTTIME,LAYER4ID,
     COALESCE (SUM(seq),0) AS seq_c,
     COALESCE (SUM(succ),0)  AS succ_c
     FROM data_table
     GROUP BY STARTTIME,LAYER4IDSELECT  MT. AS ,
     MT. AS ,
     (CASE WHEN (SUM(COALESCE(seq_c, 0))) = 0 THEN NULL
       ELSE
       (CASE WHEN (CAST((SUM(COALESCE(seq_c, 0))) AS int)) = 0 THEN 0
         ELSE ((CAST((SUM(COALESCE(succ_c, 0))) AS double))
         / (CAST((SUM(COALESCE(seq_c, 0))) AS double)))
         END) * 100
       END) AS rate
     FROM (
       SELECT sum_result.*, H_REGION. FROM
       (SELECT cast(floor((starttime + 28800) / 3600) * 3600 - 28800 as int) AS ,
         LAYER4ID,
         COALESCE(SUM(seq), 0) AS seq_c,
         COALESCE(SUM(succ), 0) AS succ_c
           FROM data_table
           WHERE STARTTIME >= 1549866600 AND STARTTIME < 1549899900
           GROUP BY cast(floor((STARTTIME + 28800) / 3600) * 3600 - 28800 as int),LAYER4ID
       )sum_result
       LEFT JOIN
       (SELECT l4id AS ,
         l4name AS ,
         l4name AS NAME_2250410101
           FROM region
           GROUP BY l4id, l4name) H_REGION
       ON sum_result.LAYER4ID = H_REGION.
     WHERE H_REGION.NAME_2250410101 IS NOT NULL
     ) MT
     GROUP BY MT., MT.
     ORDER BY  ASC LIMIT 5000
    [Root Cause]
          // TODO Find a better way to set the rewritten flag, it may fail in some conditions.
          val mapping =
            rewrittenPlan.collect { case m: ModularPlan => m } zip
            updatedDataMapTablePlan.collect { case m: ModularPlan => m }
          mapping.foreach(f => if (f._1.rewritten) f._2.setRewritten())
    this rewrite algorithm has bug, nodes are not sequential in some scenes
    
    [Solution]
    Fix the mv rewrite algorithm to fix this,we can compare the Select and Group object
between oriPlan and rewrittenPlan, but the ModularPlan tree has beed changed, so we cann't
compare their childrens, this pr add coarseEqual to compare.
    
    This closes #3129
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  6 --
 .../carbondata/mv/rewrite/MVRewriteTestCase.scala  | 96 ++++++++++++++++++++++
 .../mv/plans/modular/basicOperators.scala          | 12 +++
 3 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 4c7fbc4..8baa924 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -596,12 +596,6 @@ object MVHelper {
         case g: GroupBy =>
           MVHelper.updateDataMap(g, rewrite)
       }
-      // TODO Find a better way to set the rewritten flag, it may fail in some conditions.
-      val mapping =
-        rewrittenPlan.collect { case m: ModularPlan => m } zip
-        updatedDataMapTablePlan.collect { case m: ModularPlan => m }
-      mapping.foreach(f => if (f._1.rewritten) f._2.setRewritten())
-
       updatedDataMapTablePlan
     } else {
       rewrittenPlan
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
new file mode 100644
index 0000000..3f5164f
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVRewriteTestCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MVRewriteTestCase extends QueryTest with BeforeAndAfterAll {
+
+
+  override def beforeAll(): Unit = {
+    drop
+    sql("create table region(l4id string,l4name string) using carbondata")
+    sql(
+      s"""create table data_table(
+         	         |starttime int, seq long,succ long,LAYER4ID string,tmp int)
+         	         |using carbondata""".stripMargin)
+  }
+
+  def drop(): Unit ={
+    sql("drop table if exists region")
+    sql("drop table if exists data_table")
+  }
+
+  test("test mv count and case when expression") {
+    sql("drop datamap if exists data_table_mv")
+    sql(s"""create datamap data_table_mv using 'mv' as
+           	           | SELECT STARTTIME,LAYER4ID,
+           	           | COALESCE (SUM(seq),0) AS seq_c,
+           	           | COALESCE (SUM(succ),0)  AS succ_c
+           	           | FROM data_table
+           	           | GROUP BY STARTTIME,LAYER4ID""".stripMargin)
+
+    sql("rebuild datamap data_table_mv")
+
+    var frame = sql(s"""SELECT  MT.`3600` AS `3600`,
+                       	                       | MT.`2250410101` AS `2250410101`,
+                       	                       | (CASE WHEN (SUM(COALESCE(seq_c, 0))) = 0
THEN NULL
+                       	                       |   ELSE
+                       	                       |   (CASE WHEN (CAST((SUM(COALESCE(seq_c,
0))) AS int)) = 0 THEN 0
+                       	                       |     ELSE ((CAST((SUM(COALESCE(succ_c, 0)))
AS double))
+                       	                       |     / (CAST((SUM(COALESCE(seq_c, 0))) AS
double)))
+                       	                       |     END) * 100
+                       	                       |   END) AS rate
+                       	                       | FROM (
+                       	                       |   SELECT sum_result.*, H_REGION.`2250410101`
FROM
+                       	                       |   (SELECT cast(floor((starttime + 28800)
/ 3600) * 3600 - 28800 as int) AS `3600`,
+                       	                       |     LAYER4ID,
+                       	                       |     COALESCE(SUM(seq), 0) AS seq_c,
+                       	                       |     COALESCE(SUM(succ), 0) AS succ_c
+                       	                       |       FROM data_table
+                       	                       |       WHERE STARTTIME >= 1549866600 AND
STARTTIME < 1549899900
+                       	                       |       GROUP BY cast(floor((STARTTIME + 28800)
/ 3600) * 3600 - 28800 as int),LAYER4ID
+                       	                       |   )sum_result
+                       	                       |   LEFT JOIN
+                       	                       |   (SELECT l4id AS `225040101`,
+                       	                       |     l4name AS `2250410101`,
+                       	                       |     l4name AS NAME_2250410101
+                       	                       |       FROM region
+                       	                       |       GROUP BY l4id, l4name) H_REGION
+                       	                       |   ON sum_result.LAYER4ID = H_REGION.`225040101`
+                       	                       | WHERE H_REGION.NAME_2250410101 IS NOT NULL
+                       	                       | ) MT
+                       	                       | GROUP BY MT.`3600`, MT.`2250410101`
+                       	                       | ORDER BY `3600` ASC LIMIT 5000""".stripMargin)
+
+    assert(verifyMVDataMap(frame.queryExecution.analyzed, "data_table_mv"))
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+  override def afterAll(): Unit = {
+    drop
+  }
+}
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
index 0a76da1..18b1fee 100644
--- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
@@ -39,6 +39,12 @@ case class GroupBy(
     flagSpec: Seq[Seq[Any]],
     dataMapTableRelation: Option[ModularPlan] = None) extends UnaryNode with Matchable {
   override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+
+  override def makeCopy(newArgs: Array[AnyRef]): GroupBy = {
+    val groupBy = super.makeCopy(newArgs).asInstanceOf[GroupBy]
+    if (rewritten) groupBy.setRewritten()
+    groupBy
+  }
 }
 
 case class Select(
@@ -74,6 +80,12 @@ case class Select(
   override def extractEvaluableConditions(plan: ModularPlan): Seq[Expression] = {
     predicateList.filter(p => canEvaluate(p, plan))
   }
+
+  override def makeCopy(newArgs: Array[AnyRef]): Select = {
+    val select = super.makeCopy(newArgs).asInstanceOf[Select]
+    if (rewritten) select.setRewritten()
+    select
+  }
 }
 
 case class Union(children: Seq[ModularPlan], flags: FlagSet, flagSpec: Seq[Seq[Any]])


Mime
View raw message