carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [2/2] carbondata git commit: [HOTFIX][PR 2575] Fixed modular plan creation only if valid datamaps are available
Date Thu, 02 Aug 2018 11:23:29 GMT
[HOTFIX][PR 2575] Fixed modular plan creation only if valid datamaps are available

update query is failing in spark-2.2 cluster if mv jars are available because catalogs
are not empty if datamap are created for other table also and returns true from isValidPlan()
inside MVAnalyzerRule.

This closes #2579


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b65bf9bc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b65bf9bc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b65bf9bc

Branch: refs/heads/master
Commit: b65bf9bc7104cbcfad1277c99090853d9e7b0386
Parents: f52c133
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon Jul 30 15:00:00 2018 +0530
Committer: kunal642 <kunalkapoor642@gmail.com>
Committed: Thu Aug 2 16:52:21 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapCatalog.java |  4 +-
 .../carbondata/mv/datamap/MVAnalyzerRule.scala  | 57 ++++++++++++++++----
 .../mv/rewrite/SummaryDatasetCatalog.scala      |  9 +++-
 .../mv/rewrite/MVCreateTestCase.scala           |  4 ++
 4 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
index 89f2838..5dd4871 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapCatalog.java
@@ -38,10 +38,10 @@ public interface DataMapCatalog<T> {
   void unregisterSchema(String dataMapName);
 
   /**
-   * List all registered schema catalogs
+   * List all registered valid schema catalogs
    * @return
    */
-  T[] listAllSchema();
+  T[] listAllValidSchema();
 
   /**
    * It reloads/removes all registered schema catalogs

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index 483780f..9e0f8e5 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -16,8 +16,11 @@
  */
 package org.apache.carbondata.mv.datamap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -79,27 +82,59 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan]
{
     }
   }
 
+  /**
+   * Whether the plan is valid for doing modular plan matching and datamap replacing.
+   */
   def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = {
-    !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema())
&&
-    !plan.isInstanceOf[DeserializeToObject]
+    if (!plan.isInstanceOf[Command]  && !plan.isInstanceOf[DeserializeToObject])
{
+      val catalogs = extractCatalogs(plan)
+      !isDataMapReplaced(catalog.listAllValidSchema(), catalogs) &&
+      isDataMapExists(catalog.listAllValidSchema(), catalogs)
+    } else {
+      false
+    }
+
   }
   /**
    * Check whether datamap table already updated in the query.
    *
-   * @param plan
-   * @param mvs
-   * @return
+   * @param mvdataSetArray Array of available mvdataset which include modular plans
+   * @return Boolean whether already datamap replaced in the plan or not
    */
-  def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = {
-    val catalogs = plan collect {
-      case l: LogicalRelation => l.catalogTable
-    }
-    catalogs.isEmpty || catalogs.exists { c =>
-      mvs.exists { mv =>
+  def isDataMapReplaced(
+      mvdataSetArray: Array[SummaryDataset],
+      catalogs: Seq[Option[CatalogTable]]): Boolean = {
+    catalogs.exists { c =>
+      mvdataSetArray.exists { mv =>
         val identifier = mv.dataMapSchema.getRelationIdentifier
         identifier.getTableName.equals(c.get.identifier.table) &&
         identifier.getDatabaseName.equals(c.get.database)
       }
     }
   }
+
+  /**
+   * Check whether any suitable datamaps(like datamap which parent tables are present in
the plan)
+   * exists for this plan.
+   *
+   * @param mvs
+   * @return
+   */
+  def isDataMapExists(mvs: Array[SummaryDataset], catalogs: Seq[Option[CatalogTable]]): Boolean
= {
+    catalogs.exists { c =>
+      mvs.exists { mv =>
+        mv.dataMapSchema.getParentTables.asScala.exists { identifier =>
+          identifier.getTableName.equals(c.get.identifier.table) &&
+          identifier.getDatabaseName.equals(c.get.database)
+        }
+      }
+    }
+  }
+
+  private def extractCatalogs(plan: LogicalPlan): Seq[Option[CatalogTable]] = {
+    val catalogs = plan collect {
+      case l: LogicalRelation => l.catalogTable
+    }
+    catalogs
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
index 210ff65..026d6b7 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -152,7 +152,14 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
   }
 
 
-  override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray
+  override def listAllValidSchema(): Array[SummaryDataset] = {
+    val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails
+    // Only select the enabled datamaps for the query.
+    val enabledDataSets = summaryDatasets.filter { p =>
+      statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName))
+    }
+    enabledDataSets.toArray
+  }
 
   /**
    * API for test only

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b65bf9bc/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 0b96202..9f834a9 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -889,7 +889,10 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("basic scenario") {
 
     sql("drop table if exists mvtable1")
+    sql("drop table if exists mvtable2")
     sql("create table mvtable1(name string,age int,salary int) stored by 'carbondata'")
+    sql("create table mvtable2(name string,age int,salary int) stored by 'carbondata'")
+    sql("create datamap MV11 using 'mv' as select name from mvtable2")
     sql(" insert into mvtable1 select 'n1',12,12")
     sql("  insert into mvtable1 select 'n1',12,12")
     sql(" insert into mvtable1 select 'n3',12,12")
@@ -897,6 +900,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("update mvtable1 set(name) = ('updatedName')").show()
     checkAnswer(sql("select count(*) from mvtable1 where name = 'updatedName'"),Seq(Row(4)))
     sql("drop table if exists mvtable1")
+    sql("drop table if exists mvtable2")
   }
 
   def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {


Mime
View raw message