carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3733] Fix Incorrect query results on mv with limit
Date Fri, 20 Mar 2020 10:49:53 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new becccf5  [CARBONDATA-3733] Fix Incorrect query results on mv with limit
becccf5 is described below

commit becccf5b8bdf5f8f432de3a02507beaaa7cc18e5
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Mon Mar 2 21:28:40 2020 +0530

    [CARBONDATA-3733] Fix Incorrect query results on mv with limit
    
    Why is this PR needed?
    Issue 1:
    After creating an materilaised view, queries with simple projection and limit gives incorrect
results.
    Issue 2:
    Compact IUD_DELTA on materilaised view throws NullPointerException, because SegmentUpdateStatusManager
is not set
    Issue 3:
    Queries with order by columns not in projection for create mv's, gives incorrect results.
    
    What changes were proposed in this PR?
    Issue 1:
    Copy subsume Flag and FlagSpec to subsumerPlan while rewriting with summarydatasets.
    Update the flagSpec as per the mv attributes and copy to relation.
    Issue 2:
    Set SegmentUpdateStatusManager to CarbonLoadModel using carbonTable in case of IUD_DELTA
compaction
    Issue 3:
    Order by columns has to be present in projection list during create mv.
    
    This closes #3651
---
 docs/index/mv-guide.md                             |  2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     | 46 ----------------------
 .../CarbonAlterTableCompactionCommand.scala        | 14 +++++--
 .../apache/carbondata/mv/extension/MVHelper.scala  | 21 +++++++++-
 .../apache/carbondata/mv/extension/MVUtil.scala    | 31 +++------------
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  |  6 ++-
 .../carbondata/mv/rewrite/QueryRewrite.scala       | 10 ++++-
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  8 ++++
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 41 ++++++++++++++++++-
 9 files changed, 98 insertions(+), 81 deletions(-)

diff --git a/docs/index/mv-guide.md b/docs/index/mv-guide.md
index 2b8810c..b071967 100644
--- a/docs/index/mv-guide.md
+++ b/docs/index/mv-guide.md
@@ -74,7 +74,7 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
     GROUP BY country, sex
   ```
  **NOTE**:
- * Group by columns has to be provided in projection list while creating mv datamap
+ * Group by and Order by columns has to be provided in projection list while creating mv
datamap
  * If only single parent table is involved in mv datamap creation, then TableProperties of
Parent table
    (if not present in a aggregate function like sum(col)) listed below will be
    inherited to datamap table
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5875af6..1097b95 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,12 +17,8 @@
 
 package org.apache.spark.sql
 
-import java.io.IOException
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
-
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -35,7 +31,6 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -120,52 +115,11 @@ class CarbonEnv {
         CarbonProperties.getInstance
           .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
         initialized = true
-        cleanChildTablesNotRegisteredInHive(sparkSession)
       }
     }
     Profiler.initialize(sparkSession.sparkContext)
     LOGGER.info("Initialize CarbonEnv completed...")
   }
-
-  private def cleanChildTablesNotRegisteredInHive(sparkSession: SparkSession): Unit = {
-    // If in case JDBC application is killed/stopped, when create datamap was in progress,
datamap
-    // table was created and datampschema was saved to the system, but table was not registered
to
-    // metastore. So, when we restart JDBC application, we need to clean up
-    // stale tables and datamapschema's.
-    val dataMapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas
-    dataMapSchemas.asScala.foreach {
-      dataMapSchema =>
-        if (null != dataMapSchema.getRelationIdentifier &&
-            !dataMapSchema.isIndexDataMap) {
-          val isTableExists = try {
-            sparkSession.sessionState
-              .catalog
-              .tableExists(TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
-                Some(dataMapSchema.getRelationIdentifier.getDatabaseName)))
-          } catch {
-            // we need to take care of cleanup when the table does not exists, if table exists
and
-            // some other user tries to access the table, it might fail, that time no need
to handle
-            case ex: Exception =>
-              LOGGER.error("Error while checking the table existence", ex)
-              return
-          }
-          if (!isTableExists) {
-            try {
-              DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
-            } catch {
-              case e: IOException =>
-                throw e
-            } finally {
-              if (FileFactory.isFileExist(dataMapSchema.getRelationIdentifier.getTablePath))
{
-                CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(dataMapSchema
-                  .getRelationIdentifier
-                  .getTablePath))
-              }
-            }
-          }
-        }
-    }
-  }
 }
 
 /**
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 681e8a0..192f2bc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -42,12 +42,11 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -220,17 +219,24 @@ case class CarbonAlterTableCompactionCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(
           alterTableModel.segmentUpdateStatusManager.get)
         carbonLoadModel.setLoadMetadataDetails(
           alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
+      } else {
+        // segmentUpdateStatusManager will not be defined in case of IUD/horizontal compaction
on
+        // materialized views. In that case, create new segmentUpdateStatusManager object
+        // using carbonTable
+        val segmentUpdateStatusManager = new SegmentUpdateStatusManager(carbonTable)
+        carbonLoadModel.setSegmentUpdateStatusManager(segmentUpdateStatusManager)
+        carbonLoadModel.setLoadMetadataDetails(
+          segmentUpdateStatusManager.getLoadMetadataDetails.toList.asJava)
       }
     }
 
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       carbonLoadModel.readAndSetLoadMetadataDetails()
     }
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 7648efe..4e7835b 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast,
Coalesce, Expression, Literal, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan,
Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan,
Project, Sort}
 import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -85,6 +85,25 @@ object MVHelper {
                                                   "with limit")
       case _ =>
     }
+
+    // Order by columns needs to be present in projection list for creating mv. This is because,
+    // we have to perform order by on all segments during query, which requires the order
by column
+    // data
+    queryPlan.transform {
+      case sort@Sort(order, _, _) =>
+        order.map { orderByCol =>
+          orderByCol.child match {
+            case attr: AttributeReference =>
+              if (!queryPlan.output.contains(attr.toAttribute)) {
+                throw new UnsupportedOperationException(
+                  "Order by column `" + attr.name + "` must be present in project columns")
+              }
+          }
+          order
+        }
+        sort
+    }
+
     val mainTables = getCatalogTables(queryPlan)
     if (mainTables.isEmpty) {
       throw new MalformedCarbonCommandException(
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
index 47c7f23..c852331 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
@@ -47,20 +47,16 @@ class MVUtil {
       case select: Select =>
         select.children.map {
           case groupBy: GroupBy =>
-            getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
-              relations, groupBy.flagSpec)
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, relations)
           case _: ModularRelation =>
-            getFieldsFromProject(select.outputList, select.predicateList,
-              relations, select.flagSpec)
+            getFieldsFromProject(select.outputList, select.predicateList, relations)
         }.head
       case groupBy: GroupBy =>
         groupBy.child match {
           case select: Select =>
-            getFieldsFromProject(groupBy.outputList, select.predicateList,
-              relations, select.flagSpec)
+            getFieldsFromProject(groupBy.outputList, select.predicateList, relations)
           case _: ModularRelation =>
-            getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
-              relations, groupBy.flagSpec)
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, relations)
         }
     }
   }
@@ -71,14 +67,12 @@ class MVUtil {
    * @param outputList of the modular plan
    * @param predicateList of the modular plan
    * @param relations list of main table from query
-   * @param flagSpec to get SortOrder attribute if exists
    * @return fieldRelationMap
    */
   private def getFieldsFromProject(
       outputList: Seq[NamedExpression],
       predicateList: Seq[Expression],
-      relations: Seq[Relation],
-      flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, MVField] = {
+      relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
     var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
     fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, relations)
     var finalPredicateList: Seq[NamedExpression] = Seq.empty
@@ -88,21 +82,6 @@ class MVUtil {
           finalPredicateList = finalPredicateList.:+(attr)
       }
     }
-    // collect sort by columns
-    if (flagSpec.nonEmpty) {
-      flagSpec.map { f =>
-        f.map {
-          case list: ArrayBuffer[_] =>
-            list.map {
-              case s: SortOrder =>
-                s.collect {
-                  case attr: AttributeReference =>
-                    finalPredicateList = finalPredicateList.:+(attr)
-                }
-            }
-        }
-      }
-    }
     fieldToDataMapFieldMap ++== getFieldsFromProject(finalPredicateList.distinct, relations)
     fieldToDataMapFieldMap
   }
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index e72ce94..b967686 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -289,7 +289,11 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper
 
           if (r2eJoinsMatch) {
             if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty
&& isLOEmLOR) {
-              Seq(sel_1a)
+              if (sel_1q.flagSpec.isEmpty) {
+                Seq(sel_1a)
+              } else {
+                Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec = sel_1q.flagSpec))
+              }
             } else {
               // no compensation needed
               val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
index 405cd7c..07015d1 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -235,6 +235,13 @@ class QueryRewrite private (
       case s: Select if s.dataMapTableRelation.isDefined =>
         val relation =
           s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
+        val aliasMap = getAttributeMap(relation.outputList, s.outputList)
+        // Update the flagspec as per the mv table attributes.
+        val updatedFlagSpec: Seq[Seq[Any]] = updateFlagSpec(
+          keepAlias = false,
+          s,
+          relation,
+          aliasMap)
         val outputList = getUpdatedOutputList(relation.outputList, s.dataMapTableRelation)
         // when the output list contains multiple projection of same column, but relation
         // contains distinct columns, mapping may go wrong with columns, so select distinct
@@ -242,7 +249,8 @@ class QueryRewrite private (
         val oList = for ((o1, o2) <- mappings) yield {
           if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
         }
-        relation.copy(outputList = oList).setRewritten()
+        relation.copy(outputList = oList, flags = s.flags, flagSpec = updatedFlagSpec)
+          .setRewritten()
       case g: GroupBy if g.dataMapTableRelation.isDefined =>
         val relation =
           g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 773482f..257d087 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -135,6 +135,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
         "deptno, deptname, salary, empname from fact_table1")
     sql("select * from source limit 2").show(false)
     sql("create materialized view mv1 as select empname, deptname, avg(salary) from source
group by empname, deptname")
+    assert(sql(" select empname, deptname, avg(salary) from source group by empname, deptname
limit 2").collect().length == 2)
     var df = sql("select empname, avg(salary) from source group by empname")
     assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
     checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
@@ -280,6 +281,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("create materialized view mv5 as select empname, designation from fact_table1 where
empname='shivani'")
     val frame = sql("select designation from fact_table1 where empname='shivani'")
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv5"))
+    assert(sql("select designation from fact_table1 where empname='shivani' limit 5").collect().length
== 5)
     checkAnswer(frame, sql("select designation from fact_table2 where empname='shivani'"))
     sql(s"drop materialized view mv5")
   }
@@ -296,6 +298,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("create materialized view mv7 as select empname, designation from fact_table1 where
empname='shivani'")
     val frame = sql(
       "select empname,designation from fact_table1 where empname='shivani' and designation='SA'")
+    assert( sql("select empname,designation from fact_table1 where empname='shivani' and
designation='SA' limit 1").collect().length == 0)
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv7"))
     checkAnswer(frame, sql("select empname,designation from fact_table2 where empname='shivani'
and designation='SA'"))
     sql(s"drop materialized view mv7")
@@ -394,6 +397,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
       "select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1
group" +
       " by empname")
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv17"))
+    assert(sql("select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from
fact_table1 group by empname limit 2").collect().length == 2)
     checkAnswer(frame, sql("select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE
0 END) from fact_table2 group" +
                            " by empname"))
     sql(s"drop materialized view mv17")
@@ -437,6 +441,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname
= t2.empname")
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv21"))
+    assert(sql("select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1
group by empname limit 2").collect().length == 2)
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5
t2 where t1.empname = t2.empname"))
     sql(s"drop materialized view mv21")
   }
@@ -485,6 +490,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     val frame1 = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on (t1.empname=t3.empname)")
     assert(TestUtil.verifyMVDataMap(frame1.queryExecution.optimizedPlan, "mv25"))
+    assert(sql("select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2
t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on (t1.empname=t3.empname) limit
2").collect().length == 2)
     checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5
t2 where t1.empname = t2.empname"))
     sql(s"drop materialized view mv25")
   }
@@ -508,6 +514,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "mv27"))
     checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4
t1,fact_table5 t2  " +
                            "where t1.empname = t2.empname group by t1.empname, t2.designation"))
+    assert(sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2
t2  where t1.empname = t2.empname group by t1.empname, t2.designation limit 2").collect().length
== 2)
     sql(s"drop materialized view mv27")
   }
 
@@ -759,6 +766,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     val frame = sql(
       "select empname,sum(salary) as total from fact_table1 group by empname order by empname")
     assert(TestUtil.verifyMVDataMap(frame.queryExecution.optimizedPlan, "MV_order"))
+    assert(sql("select empname,sum(salary) as total from fact_table1 group by empname order
by empname limit 2").collect().length == 2)
   }
 
   test("jira carbondata-2528-2") {
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 66b0c38..c5211c3 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -575,16 +575,55 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach
{
     sql("insert into table maintable select 'abc',21,2000")
     val res = sql("select name from maintable order by c_code")
     sql("drop materialized view if exists dm1")
-    sql("create materialized view dm1  as select name from maintable order by c_code")
+    intercept[UnsupportedOperationException] {
+      sql("create materialized view dm1  as select name from maintable order by c_code")
+    }.getMessage.contains("Order by column `c_code` must be present in project columns")
+    sql("create materialized view dm1  as select name,c_code from maintable order by c_code")
     val df = sql("select name from maintable order by c_code")
     TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "dm1")
     checkAnswer(res, df)
     intercept[Exception] {
       sql("alter table maintable drop columns(c_code)")
     }.getMessage.contains("Column name cannot be dropped because it exists in mv materialized
view: dm1")
+    sql("insert into table maintable select 'mno',20,2000")
+    checkAnswer(sql("select name from maintable order by c_code"), Seq(Row("mno"), Row("abc")))
+    sql("drop table if exists maintable")
+  }
+
+  test("test query on mv with limit") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS carbondata")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("insert into table maintable select 'bcd',22,2000")
+    sql("insert into table maintable select 'def',22,2000")
+    sql("drop materialized view if exists mv1")
+    sql("create materialized view mv1  as select a.name,a.price from maintable a")
+    var dataFrame = sql("select a.name,a.price from maintable a limit 1")
+    assert(dataFrame.count() == 1)
+    TestUtil.verifyMVDataMap(dataFrame.queryExecution.optimizedPlan, "mv1")
+    dataFrame = sql("select a.name,a.price from maintable a order by a.name limit 1")
+    assert(dataFrame.count() == 1)
+    TestUtil.verifyMVDataMap(dataFrame.queryExecution.optimizedPlan, "mv1")
     sql("drop table if exists maintable")
   }
 
+  test("test horizontal comapction on mv for more than two update") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS carbondata")
+    sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+    sql("drop materialized view if exists mv1")
+    sql("create materialized view mv1  as select name,c_code from maintable")
+    sql("update maintable set(name) = ('aaa') where c_code = 21").show(false)
+    var df = sql("select name,c_code from maintable")
+    TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(df, Seq(Row("aaa",21), Row("mno",24)))
+    sql("update maintable set(name) = ('mmm') where c_code = 24").show(false)
+    df = sql("select name,c_code from maintable")
+    TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(df, Seq(Row("aaa",21), Row("mmm",24)))
+    sql("drop table IF EXISTS maintable")
+  }
+
   test("drop meta cache on mv materialized view table") {
     defaultConfig()
     sql("drop table IF EXISTS maintable")


Mime
View raw message