carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akash...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4093] Added logs for MV and method to verify if mv is in Sync during query
Date Mon, 21 Dec 2020 11:17:45 GMT
This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new aae93c1  [CARBONDATA-4093] Added logs for MV and method to verify if mv is in Sync
during query
aae93c1 is described below

commit aae93c1bf79e16b10f28f17ab8a07a824903bf00
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Sat Dec 19 16:09:26 2020 +0530

    [CARBONDATA-4093] Added logs for MV and method to verify if mv is in Sync during query
    
    Why is this PR needed?
    Added logs for MV and method to verify if mv is in Sync during query
    
    What changes were proposed in this PR?
    1. Move MV Enable Check to beginning to avoid transform logical plan
    2. Add Logger if exception is occurred during fetching mv schema
    3. Check if MV is in Sync and allow Query rewrite
    4. Reuse reading LoadMetadetails to get mergedLoadMapping
    5. Set NO-Dict Schema types for insert-partition flow - missed from [CARBONDATA-4077]
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4060
---
 .../core/statusmanager/SegmentStatusManager.java   | 53 +++++++++++++++++-----
 .../org/apache/carbondata/core/view/MVManager.java | 26 ++++++++++-
 .../apache/carbondata/core/view/MVProvider.java    | 36 +++++++++++----
 .../apache/carbondata/view/MVCatalogInSpark.scala  |  4 ++
 .../apache/carbondata/view/MVManagerInSpark.scala  |  7 +--
 .../org/apache/carbondata/view/MVRefresher.scala   | 36 +++++++--------
 .../apache/spark/sql/optimizer/MVRewriteRule.scala | 28 ++++++------
 .../processing/sort/sortdata/SortParameters.java   |  4 +-
 .../processing/util/CarbonDataProcessorUtil.java   | 20 +++++++-
 9 files changed, 152 insertions(+), 62 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 4af4a55..5b800a2 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -142,6 +142,7 @@ public class SegmentStatusManager {
     List<Segment> listOfInvalidSegments = new ArrayList<>(10);
     List<Segment> listOfStreamSegments = new ArrayList<>(10);
     List<Segment> listOfInProgressSegments = new ArrayList<>(10);
+    Map<String, List<String>> mergedLoadMapping = new HashMap<>();
 
     try {
       if (loadMetadataDetails == null) {
@@ -204,6 +205,20 @@ public class SegmentStatusManager {
             || SegmentStatus.COMPACTED == segment.getSegmentStatus()
             || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
           listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+          if (SegmentStatus.COMPACTED == segment.getSegmentStatus()) {
+            // After main table compaction, segment mapping of child tables may not be updated.
+            // In order to check if main table and child table are in sync after compaction,
+            // check the main table's merged segment's map. ex: {0.1 -> 0,1,2,3}
+            if (null != segment.getMergedLoadName()) {
+              if (mergedLoadMapping.containsKey(segment.getMergedLoadName())) {
+                mergedLoadMapping.get(segment.getMergedLoadName()).add(segment.getLoadName());
+              } else {
+                List<String> mergedLoads = new ArrayList<>();
+                mergedLoads.add(segment.getLoadName());
+                mergedLoadMapping.put(segment.getMergedLoadName(), mergedLoads);
+              }
+            }
+          }
         } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
             SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
           listOfInProgressSegments.add(
@@ -215,7 +230,7 @@ public class SegmentStatusManager {
       throw e;
     }
     return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
-        listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments);
+        listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments, mergedLoadMapping);
   }
 
   /**
@@ -235,24 +250,31 @@ public class SegmentStatusManager {
   }
 
   /**
-   * Returns valid segment list for a given RelationIdentifier
-   *
-   * @param relationIdentifier get list of segments for relation identifier
-   * @return list of valid segment id's
+   * Returns valid segment list from validAndInvalidSegmentsInfo
    */
-  public static List<String> getValidSegmentList(RelationIdentifier relationIdentifier)
-      throws IOException {
+  public static List<String> getValidSegmentList(
+      ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo) {
     List<String> segmentList = new ArrayList<>();
-    List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
-        .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
-            relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
-    for (Segment segment : validSegments) {
+    for (Segment segment : validAndInvalidSegmentsInfo.getValidSegments()) {
       segmentList.add(segment.getSegmentNo());
     }
     return segmentList;
   }
 
   /**
+   * Returns ValidAndInvalidSegmentsInfo for a given RelationIdentifier
+   *
+   * @param relationIdentifier get list of segments for relation identifier
+   * @return validAndInvalidSegmentsInfo
+   */
+  public static ValidAndInvalidSegmentsInfo getValidAndInvalidSegmentsInfo(
+      RelationIdentifier relationIdentifier) throws IOException {
+    return new SegmentStatusManager(AbsoluteTableIdentifier
+        .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
+            relationIdentifier.getTableName())).getValidAndInvalidSegments();
+  }
+
+  /**
    * Reads the table status file with the specified UUID if non empty.
    */
   public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String
uuid)
@@ -843,15 +865,18 @@ public class SegmentStatusManager {
     private final List<Segment> listOfInvalidSegments;
     private final List<Segment> listOfStreamSegments;
     private final List<Segment> listOfInProgressSegments;
+    Map<String, List<String>> mergedLoadMapping;
 
     private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments,
         List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments,
-        List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments)
{
+        List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments,
+        Map<String, List<String>> mergedLoadMapping) {
       this.listOfValidSegments = listOfValidSegments;
       this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
       this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
       this.listOfStreamSegments = listOfStreamSegments;
       this.listOfInProgressSegments = listOfInProgressSegments;
+      this.mergedLoadMapping = mergedLoadMapping;
     }
 
     public List<Segment> getInvalidSegments() {
@@ -869,6 +894,10 @@ public class SegmentStatusManager {
     public List<Segment> getListOfInProgressSegments() {
       return listOfInProgressSegments;
     }
+
+    public Map<String, List<String>> getMergedLoadMapping() {
+      return mergedLoadMapping;
+    }
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index 62f0583..284a672 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -75,6 +75,10 @@ public abstract class MVManager {
     return false;
   }
 
+  public boolean isMVInSyncWithParentTables(MVSchema mvSchema) throws IOException {
+    return schemaProvider.isViewCanBeEnabled(mvSchema, true);
+  }
+
   /**
    * It gives all mv schemas of a given table.
    * For show mv command.
@@ -114,7 +118,16 @@ public abstract class MVManager {
   public List<MVSchema> getSchemas() throws IOException {
     List<MVSchema> schemas = new ArrayList<>();
     for (String database : this.getDatabases()) {
-      schemas.addAll(this.getSchemas(database));
+      try {
+        schemas.addAll(this.getSchemas(database));
+      } catch (IOException ex) {
+        throw ex;
+      } catch (Exception ex) {
+        LOGGER.error("Exception Occurred: Skipping MV schemas from database: " + database);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(ex.getMessage());
+        }
+      }
     }
     return schemas;
   }
@@ -234,7 +247,16 @@ public abstract class MVManager {
   public List<MVStatusDetail> getEnabledStatusDetails() throws IOException {
     List<MVStatusDetail> statusDetails = new ArrayList<>();
     for (String database : this.getDatabases()) {
-      statusDetails.addAll(this.getEnabledStatusDetails(database));
+      try {
+        statusDetails.addAll(this.getEnabledStatusDetails(database));
+      } catch (IOException ex) {
+        throw ex;
+      } catch (Exception ex) {
+        LOGGER.error("Exception Occurred: Skipping MV schemas from database: " + database);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(ex.getMessage());
+        }
+      }
     }
     return statusDetails;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 9aa5cf3..cf5f6a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -246,7 +248,7 @@ public class MVProvider {
         LOG.info("Materialized view status lock has been successfully acquired.");
         if (status == MVStatus.ENABLED) {
           // Enable mv only if mv tables and main table are in sync
-          if (!isViewCanBeEnabled(schemaList.get(0))) {
+          if (!isViewCanBeEnabled(schemaList.get(0), false)) {
             return;
           }
         }
@@ -336,10 +338,12 @@ public class MVProvider {
    * @return flag to enable or disable mv
    * @throws IOException
    */
-  private static boolean isViewCanBeEnabled(MVSchema schema)
+  public boolean isViewCanBeEnabled(MVSchema schema, boolean ignoreDeferredCheck)
       throws IOException {
-    if (!schema.isRefreshIncremental()) {
-      return true;
+    if (!ignoreDeferredCheck) {
+      if (!schema.isRefreshIncremental()) {
+        return true;
+      }
     }
     boolean isViewCanBeEnabled = true;
     String viewMetadataPath =
@@ -364,18 +368,34 @@ public class MVProvider {
     }
     List<RelationIdentifier> relatedTables = schema.getRelatedTables();
     for (RelationIdentifier relatedTable : relatedTables) {
+      SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
+          SegmentStatusManager.getValidAndInvalidSegmentsInfo(relatedTable);
       List<String> relatedTableSegmentList =
-          SegmentStatusManager.getValidSegmentList(relatedTable);
+          SegmentStatusManager.getValidSegmentList(validAndInvalidSegmentsInfo);
       if (!relatedTableSegmentList.isEmpty()) {
         if (viewSegmentMap.isEmpty()) {
           isViewCanBeEnabled = false;
         } else {
-          isViewCanBeEnabled = viewSegmentMap.get(
-              relatedTable.getDatabaseName() + CarbonCommonConstants.POINT +
-                  relatedTable.getTableName()).containsAll(relatedTableSegmentList);
+          String tableUniqueName =
+              relatedTable.getDatabaseName() + CarbonCommonConstants.POINT + relatedTable
+                  .getTableName();
+          isViewCanBeEnabled =
+              viewSegmentMap.get(tableUniqueName).containsAll(relatedTableSegmentList);
+          if (!isViewCanBeEnabled) {
+            // in case if main table is compacted and mv table mapping is not updated,
+            // check from merged Load Mapping
+            isViewCanBeEnabled = viewSegmentMap.get(tableUniqueName).containsAll(
+                relatedTableSegmentList.stream()
+                    .map(validAndInvalidSegmentsInfo.getMergedLoadMapping()::get)
+                    .flatMap(Collection::stream).collect(Collectors.toList()));
+          }
         }
       }
     }
+    if (!isViewCanBeEnabled) {
+      LOG.error("MV `" + schema.getIdentifier().getTableName()
+          + "` is not in Sync with its related tables. Refresh MV to sync it.");
+    }
     return isViewCanBeEnabled;
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index c56b7e1..8707fab 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -82,6 +82,10 @@ case class MVCatalogInSpark(session: SparkSession)
     enabledSchemas.toArray
   }
 
+  def isMVInSync(mvSchema: MVSchema): Boolean = {
+    viewManager.isMVInSyncWithParentTables(mvSchema)
+  }
+
   /**
    * Registers the data produced by the logical representation of the given [[DataFrame]].
Unlike
    * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index 676c135..3300629 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -31,12 +31,7 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
   override def getDatabases: util.List[String] = {
     CarbonThreadUtil.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true")
     try {
-      val databaseList = session.catalog.listDatabases()
-      val databaseNameList = new util.ArrayList[String]()
-      for (database <- databaseList.collect()) {
-        databaseNameList.add(database.name)
-      }
-      databaseNameList
+      session.sessionState.catalog.listDatabases().asJava
     } finally {
       CarbonThreadUtil.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV)
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index 31b5548..545ccfc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.ICarbonLock
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager.ValidAndInvalidSegmentsInfo
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -111,8 +112,10 @@ object MVRefresher {
           val relatedTableIds =
             viewSchema.getRelatedTables.asScala.filter(_.isCarbonDataTable)
           for (relatedTableId <- relatedTableIds) {
-            val relatedTableSegmentList: util.List[String] =
-              SegmentStatusManager.getValidSegmentList(relatedTableId)
+            val validAndInvalidSegmentsInfo =
+              SegmentStatusManager.getValidAndInvalidSegmentsInfo(relatedTableId)
+            val relatedTableSegmentList: util.List[String] = SegmentStatusManager
+              .getValidSegmentList(validAndInvalidSegmentsInfo)
             if (relatedTableSegmentList.isEmpty) {
               return false
             }
@@ -236,8 +239,10 @@ object MVRefresher {
     if (listOfLoadFolderDetails.isEmpty) {
       // If segment Map is empty, load all valid segments from main tables to mv
       for (relationIdentifier <- relationIdentifiers.asScala) {
-        val mainTableSegmentList: util.List[String] = SegmentStatusManager.getValidSegmentList(
-          relationIdentifier)
+        val validAndInvalidSegmentsInfo =
+          SegmentStatusManager.getValidAndInvalidSegmentsInfo(relationIdentifier)
+        val mainTableSegmentList: util.List[String] = SegmentStatusManager
+          .getValidSegmentList(validAndInvalidSegmentsInfo)
         // If mainTableSegmentList is empty, no need to trigger load command
         // TODO: handle in case of multiple tables load to mv table
         if (mainTableSegmentList.isEmpty) return false
@@ -249,19 +254,16 @@ object MVRefresher {
       for (relationIdentifier <- relationIdentifiers.asScala) {
         val segmentList: util.List[String] = new util.ArrayList[String]
         // Get all segments for parent relationIdentifier
-        val mainTableSegmentList: util.List[String] = SegmentStatusManager.getValidSegmentList(
-          relationIdentifier)
+        val validAndInvalidSegmentsInfo =
+          SegmentStatusManager.getValidAndInvalidSegmentsInfo(relationIdentifier)
+        val mainTableSegmentList: util.List[String] = SegmentStatusManager
+          .getValidSegmentList(validAndInvalidSegmentsInfo)
         var ifTableStatusUpdateRequired: Boolean = false
         for (loadMetaDetail <- listOfLoadFolderDetails.asScala) {
           if ((loadMetaDetail.getSegmentStatus eq SegmentStatus.SUCCESS) ||
               (loadMetaDetail.getSegmentStatus eq SegmentStatus.INSERT_IN_PROGRESS)) {
             val segmentMaps: util.Map[String, util.List[String]] =
               new Gson().fromJson(loadMetaDetail.getExtraInfo, classOf[util.Map[_, _]])
-            val mainTableMetaDataPath: String = CarbonTablePath.getMetadataPath(relationIdentifier
-              .getTablePath)
-            val parentTableLoadMetaDataDetails: Array[LoadMetadataDetails] = SegmentStatusManager
-              .readLoadMetadata(
-                mainTableMetaDataPath)
             val table: String = relationIdentifier.getDatabaseName + CarbonCommonConstants.POINT
+
                                 relationIdentifier.getTableName
             for (segmentId <- mainTableSegmentList.asScala) {
@@ -270,7 +272,7 @@ object MVRefresher {
               // {0,1,2} segments of mainTable are compacted to 0.1. Then,
               // on next rebuild/load to mv, no need to load segment(0.1) again. Update the
               // segmentMapping of mv segment from {0,1,2} to {0.1}
-              if (!checkIfSegmentsToBeReloaded(parentTableLoadMetaDataDetails,
+              if (!checkIfSegmentsToBeReloaded(validAndInvalidSegmentsInfo,
                 segmentMaps.get(table),
                 segmentId)) {
                 ifTableStatusUpdateRequired = true
@@ -350,16 +352,14 @@ object MVRefresher {
   /**
    * This method checks if mv table segment has to be reloaded again or not
    */
-  private def checkIfSegmentsToBeReloaded(loadMetaDataDetails: Array[LoadMetadataDetails],
+  private def checkIfSegmentsToBeReloaded(validAndInvalidSegmentsInfo: ValidAndInvalidSegmentsInfo,
       segmentIds: util.List[String],
       segmentId: String): Boolean = {
     var isToBeLoadedAgain: Boolean = true
     val mergedSegments: util.List[String] = new util.ArrayList[String]
-    for (loadMetadataDetail <- loadMetaDataDetails) {
-      if (null != loadMetadataDetail.getMergedLoadName &&
-          loadMetadataDetail.getMergedLoadName.equalsIgnoreCase(segmentId)) {
-        mergedSegments.add(loadMetadataDetail.getLoadName)
-      }
+    val mergedLoadMapping = validAndInvalidSegmentsInfo.getMergedLoadMapping
+    if (!mergedLoadMapping.isEmpty && mergedLoadMapping.containsKey(segmentId)) {
+      mergedSegments.addAll(mergedLoadMapping.get(segmentId))
     }
     if (!mergedSegments.isEmpty && segmentIds.containsAll(mergedSegments)) {
       isToBeLoadedAgain = false
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index 10b793b..12fe9bd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -42,6 +42,10 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
   val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def apply(logicalPlan: LogicalPlan): LogicalPlan = {
+    // check if query needs to be rewritten with mv
+    if (!CarbonProperties.getInstance().isMVEnabled) {
+      return logicalPlan
+    }
     // only query need to check this rule
     logicalPlan match {
       case _: Command => return logicalPlan
@@ -101,9 +105,6 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     if (!canApply) {
       return logicalPlan
     }
-    if (!CarbonProperties.getInstance().isMVEnabled) {
-      return logicalPlan
-    }
     val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
     if (viewCatalog != null && hasSuitableMV(logicalPlan, viewCatalog)) {
       LOGGER.debug(s"Query Rewrite has been initiated for the plan: " +
@@ -173,17 +174,17 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan]
{
    * Whether the plan is valid for doing modular plan matching and mv replacing.
    */
   private def hasSuitableMV(logicalPlan: LogicalPlan,
-      catalog: MVCatalogInSpark): Boolean = {
+      mvCatalog: MVCatalogInSpark): Boolean = {
     if (!logicalPlan.isInstanceOf[Command] && !logicalPlan.isInstanceOf[DeserializeToObject])
{
-      val catalogs = logicalPlan collect {
+      val catalogTables = logicalPlan collect {
         case relation: LogicalRelation if relation.catalogTable.isDefined => relation.catalogTable
         case relation: HiveTableRelation => Option(relation.tableMeta)
       }
-      val validSchemas = catalog.getValidSchemas()
-      catalogs.nonEmpty &&
-      !isRewritten(validSchemas, catalogs) &&
-      !isRelatedTableSegmentsSetAsInput(catalogs) &&
-      isRelated(validSchemas, catalogs)
+      val validSchemas = mvCatalog.getValidSchemas()
+      catalogTables.nonEmpty &&
+      !isRewritten(validSchemas, catalogTables) &&
+      !isRelatedTableSegmentsSetAsInput(catalogTables) &&
+      isRelatedAndSyncWithParentTables(mvCatalog, validSchemas, catalogTables)
     } else {
       false
     }
@@ -215,8 +216,9 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
    *
    * @return
    */
-  private def isRelated(mvSchemas: Array[MVSchemaWrapper],
-                        tables: Seq[Option[CatalogTable]]): Boolean = {
+  private def isRelatedAndSyncWithParentTables(catalog: MVCatalogInSpark,
+      mvSchemas: Array[MVSchemaWrapper],
+      tables: Seq[Option[CatalogTable]]): Boolean = {
     tables.exists {
       table =>
         mvSchemas.exists {
@@ -225,7 +227,7 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
             mvIdentifier =>
               mvIdentifier.getTableName.equals(table.get.identifier.table) &&
               mvIdentifier.getDatabaseName.equals(table.get.database)
-          }
+          } && catalog.isMVInSync(mvSchema.viewSchema)
         }
     }
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index e8946b9..9339e5b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -522,9 +522,11 @@ public class SortParameters implements Serializable {
           columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
       parameters.setDictSortColIdxSchemaOrderMapping(
           columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
+      parameters.setNoDictSchemaDataType(CarbonDataProcessorUtil
+          .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
       parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
-          .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
+          .getNoDictSortDataTypesAsDataFieldOrder(configuration.getDataFields()));
       Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
           .getNoDictSortAndNoSortDataTypesAsDataFieldOrder(configuration.getDataFields());
       parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index d8f8673..3a68410 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -396,12 +396,28 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * get visible no dictionary dimensions as per data field order
+   * Get visible no dictionary sort dimensions as per data field order
+   */
+  public static DataType[] getNoDictDataTypesAsDataFieldOrder(DataField[] dataFields) {
+    List<DataType> type = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension())
{
+        if (!dataField.getColumn().hasEncoding(Encoding.DICTIONARY)
+            && dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE)
{
+          type.add(dataField.getColumn().getColumnSchema().getDataType());
+        }
+      }
+    }
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
+   * get visible no dictionary sort dimensions as per data field order
    *
    * @param dataFields
    * @return
    */
-  public static DataType[] getNoDictDataTypesAsDataFieldOrder(DataField[] dataFields) {
+  public static DataType[] getNoDictSortDataTypesAsDataFieldOrder(DataField[] dataFields)
{
     List<DataType> type = new ArrayList<>();
     for (DataField dataField : dataFields) {
       if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension())
{


Mime
View raw message