carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4077] Refactor and Fix Insert into partition issue with FileMergeSortComparator
Date Thu, 17 Dec 2020 11:57:21 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 29ecd5f  [CARBONDATA-4077] Refactor and Fix Insert into partition issue with FileMergeSortComparator
29ecd5f is described below

commit 29ecd5fee7af97633b756bfd944251a18dc76c18
Author: Indhumathi27 <indhumathim27@gmail.com>
AuthorDate: Sat Dec 5 19:35:58 2020 +0530

    [CARBONDATA-4077] Refactor and Fix Insert into partition issue with FileMergeSortComparator
    
    Why is this PR needed?
    From PR-3995 changes, insert into partition flow scenario is missed. Using Map for getting
    Dict/No-Dict sort column info during final sort task will affect load performance,
    if number of sort columns is more.
    
    What changes were proposed in this PR?
    Handled the insert into partition flow
    Refactored the code, to use list of only Dict/No-Dict sort column indexes instead of
    Map to fix performance issue.
    
    This closes #4039
---
 .../query/SecondaryIndexQueryResultProcessor.java  |   2 -
 .../unsafe/holder/UnsafeFinalMergePageHolder.java  |   6 +-
 .../sort/unsafe/holder/UnsafeInmemoryHolder.java   |   6 +-
 .../holder/UnsafeSortTempFileChunkHolder.java      |   6 +-
 .../merger/CompactionResultSortProcessor.java      |  11 +-
 .../sort/sortdata/FileMergeSortComparator.java     | 100 ++++++++--------
 .../processing/sort/sortdata/SortParameters.java   |  69 +++++++----
 .../sort/sortdata/SortTempFileChunkHolder.java     |  12 +-
 .../processing/sort/sortdata/TableFieldStat.java   |  27 +++--
 .../processing/util/CarbonDataProcessorUtil.java   | 127 ++++++++++++++-------
 .../sort/sortdata/FileMergeSortComparatorTest.java |  27 ++---
 11 files changed, 239 insertions(+), 154 deletions(-)

diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 4d045f0..41a5c43 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -528,8 +528,6 @@ public class SecondaryIndexQueryResultProcessor {
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     sortParameters.setNoDictionarySortColumn(
         CarbonDataProcessorUtil.getNoDictSortColMapping(indexTable));
-    sortParameters.setSortColumnSchemaOrderMap(
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(indexTable));
     finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation,
         indexTable.getTableName(), sortParameters);
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 717bb91..7abd3a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -61,10 +61,10 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder
{
     }
     this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictSchemaDataType(),
+    this.comparator = new FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
         tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-        tableFieldStat.getSortColSchemaOrderMap());
+        tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+        tableFieldStat.getDictSortColIdxSchemaOrderMapping());
   }
 
   public boolean hasNext() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index a46811f..a21e802 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -48,10 +48,10 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     this.rowPage = rowPage;
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator =
-        new FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
-            rowPage.getTableFieldStat().getNoDictSchemaDataType(),
+        new FileMergeSortComparator(rowPage.getTableFieldStat().getNoDictSchemaDataType(),
             rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping(),
-            rowPage.getTableFieldStat().getSortColSchemaOrderMap());
+            rowPage.getTableFieldStat().getNoDictSortColIdxSchemaOrderMapping(),
+            rowPage.getTableFieldStat().getDictSortColIdxSchemaOrderMapping());
     this.rowPage.setReadConvertedNoSortField();
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 5144f57..77b19f4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -120,10 +120,10 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder
{
       comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
           parameters.getNoDictDataType());
     } else {
-      this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-          tableFieldStat.getNoDictSchemaDataType(),
+      this.comparator = new FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
           tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-          tableFieldStat.getSortColSchemaOrderMap());
+          tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+          tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     }
     initialize();
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 9cfda1d..3f27eee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -480,8 +481,14 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
     boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
         .getNoDictSortColMapping(carbonTable);
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
-    sortParameters.setSortColumnSchemaOrderMap(
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable));
+    Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+        .getColumnIdxBasedOnSchemaInRow(carbonTable);
+    sortParameters.setNoDictSortColumnSchemaOrderMapping(
+        columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+    sortParameters.setNoDictSortColIdxSchemaOrderMapping(
+        columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+    sortParameters.setDictSortColIdxSchemaOrderMapping(
+        columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
index 687af7b..337bf73 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -18,8 +18,6 @@
 package org.apache.carbondata.processing.sort.sortdata;
 
 import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -29,8 +27,6 @@ import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 
 public class FileMergeSortComparator implements Comparator<IntermediateSortTempRow>
{
 
-  private boolean[] isSortColumnNoDictionary;
-
   /**
    * Datatype of all the no-dictionary columns in the table in schema order
    */
@@ -42,21 +38,25 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
   private int[] noDictPrimitiveIndex;
 
   /**
-   * Sort and Dictionary info of all the columns in schema order
+   * Index of the No-Dict Sort columns in schema order for final merge step of sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of sorting.
    */
-  private final Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
+  private int[] dictSortColIdxSchemaOrderMapping;
 
   /**
    * Comparator for IntermediateSortTempRow for compatibility cases where column added in
old
    * version and it is sort column
-   * @param isSortColumnNoDictionary isSortColumnNoDictionary
    */
-  public FileMergeSortComparator(boolean[] isSortColumnNoDictionary, DataType[] noDictDataTypes,
-      int[] columnIdBasedOnSchemaInRow, Map<Integer, List<Boolean>> sortColumnSchemaOrderMap)
{
-    this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+  public FileMergeSortComparator(DataType[] noDictDataTypes, int[] columnIdBasedOnSchemaInRow,
+      int[] noDictSortColIdxSchemaOrderMapping, int[] dictSortColIdxSchemaOrderMapping) {
     this.noDictDataTypes = noDictDataTypes;
     this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
-    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+    this.noDictSortColIdxSchemaOrderMapping = noDictSortColIdxSchemaOrderMapping;
+    this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
   }
 
   /**
@@ -68,8 +68,8 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
    * 1. only Dictionary sort column data
    * 2. dictionary sort and dictionary no-sort column data
    * On this temp data row, we have to identify the sort column data and only compare those.
-   * From the sortColumnSchemaOrderMap, get the sort column and dict info. If the column
-   * is sort column, then perform the comparison, else increment the respective index.
+   * Use noDictSortColIdxSchemaOrderMapping and dictSortColIdxSchemaOrderMapping to get
+   * the indexes of sort column in schema order
    */
   @Override
   public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
@@ -78,51 +78,47 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
     int nonDictIndex = 0;
     int noDicTypeIdx = 0;
     int schemaRowIdx = 0;
-    int sortIndex = 0;
-
-    for (Map.Entry<Integer, List<Boolean>> schemaEntry : sortColumnSchemaOrderMap.entrySet())
{
-      boolean isSortColumn = schemaEntry.getValue().get(0);
-      boolean isDictColumn = schemaEntry.getValue().get(1);
-      if (isSortColumn) {
-        if (isSortColumnNoDictionary[sortIndex++]) {
-          if (DataTypeUtil.isPrimitiveColumn(noDictDataTypes[noDicTypeIdx])) {
-            // use data types based comparator for the no dictionary measure columns
-            SerializableComparator comparator =
-                org.apache.carbondata.core.util.comparator.Comparator
-                    .getComparator(noDictDataTypes[noDicTypeIdx]);
-            int difference = comparator
-                .compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
-                    rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
-            schemaRowIdx++;
-            if (difference != 0) {
-              return difference;
-            }
-          } else {
-            byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
-            byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
 
-            int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-            if (difference != 0) {
-              return difference;
-            }
-          }
-        } else {
-          int dimFieldA = rowA.getDictSortDims()[dictIndex];
-          int dimFieldB = rowB.getDictSortDims()[dictIndex];
+    // compare no-Dict sort columns
+    for (int i = 0; i < noDictSortColIdxSchemaOrderMapping.length; i++) {
+      if (DataTypeUtil
+          .isPrimitiveColumn(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]))
{
+        // use data types based comparator for the no dictionary measure columns
+        SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+            .getComparator(noDictDataTypes[noDictSortColIdxSchemaOrderMapping[noDicTypeIdx]]);
+        int difference = comparator
+            .compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+                rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+        schemaRowIdx++;
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        byte[] byteArr1 =
+            (byte[]) rowA.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
+        byte[] byteArr2 =
+            (byte[]) rowB.getNoDictSortDims()[noDictSortColIdxSchemaOrderMapping[nonDictIndex]];
 
-          diff = dimFieldA - dimFieldB;
-          if (diff != 0) {
-            return diff;
-          }
+        int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
         }
       }
-      if (isDictColumn) {
-        dictIndex++;
-      } else {
-        nonDictIndex++;
-        noDicTypeIdx++;
+      nonDictIndex++;
+      noDicTypeIdx++;
+    }
+    // compare dict sort columns
+    for (int i = 0; i < dictSortColIdxSchemaOrderMapping.length; i++) {
+      int dimFieldA = rowA.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+      int dimFieldB = rowB.getDictSortDims()[dictSortColIdxSchemaOrderMapping[dictIndex]];
+      dictIndex++;
+
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
       }
     }
+
     return diff;
   }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index fc93d52..e8946b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.File;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -108,9 +107,6 @@ public class SortParameters implements Serializable {
   // used while performing final sort of intermediate files
   private DataType[] noDictSchemaDataType;
 
-  // Sort and Dictionary info of all the columns in schema order, used in final sort
-  private Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
-
   /**
    * To know how many columns are of high cardinality.
    */
@@ -167,6 +163,16 @@ public class SortParameters implements Serializable {
    */
   private int[] noDictSortColumnSchemaOrderMapping;
 
+  /**
+   * Index of the no dict Sort columns in schema order used for final merge step of sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of sorting.
+   */
+  private int[] dictSortColIdxSchemaOrderMapping;
+
   private boolean isInsertWithoutReArrangeFlow;
 
   public SortParameters getCopy() {
@@ -209,7 +215,8 @@ public class SortParameters implements Serializable {
     parameters.noDictSortColumnSchemaOrderMapping = noDictSortColumnSchemaOrderMapping;
     parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
     parameters.noDictSchemaDataType = noDictSchemaDataType;
-    parameters.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+    parameters.noDictSortColIdxSchemaOrderMapping = noDictSortColIdxSchemaOrderMapping;
+    parameters.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
     return parameters;
   }
 
@@ -417,7 +424,7 @@ public class SortParameters implements Serializable {
     return noDictSortColumnSchemaOrderMapping;
   }
 
-  private void setNoDictSortColumnSchemaOrderMapping(int[] noDictSortColumnSchemaOrderMapping)
{
+  public void setNoDictSortColumnSchemaOrderMapping(int[] noDictSortColumnSchemaOrderMapping)
{
     this.noDictSortColumnSchemaOrderMapping = noDictSortColumnSchemaOrderMapping;
   }
 
@@ -507,8 +514,14 @@ public class SortParameters implements Serializable {
       parameters.setInsertWithoutReArrangeFlow(true);
       parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
           .getNoDictSortColMappingAsDataFieldOrder(configuration.getDataFields()));
-      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-          .getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields()));
+      Map<String, int[]> columnIdxMap = CarbonDataProcessorUtil
+          .getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields());
+      parameters.setNoDictSortColumnSchemaOrderMapping(
+          columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+      parameters.setNoDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+      parameters.setDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
       parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
           .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
@@ -525,13 +538,17 @@ public class SortParameters implements Serializable {
     } else {
       parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
           .getNoDictSortColMapping(parameters.getCarbonTable()));
-      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-          .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+      Map<String, int[]> columnIdxMap =
+          CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+      parameters
+          .setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+      parameters.setNoDictSortColIdxSchemaOrderMapping(
+          columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+      parameters
+          .setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
       parameters.setMeasureDataType(configuration.getMeasureDataType());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
           .getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable()));
-      parameters.setSortColumnSchemaOrderMap(
-          CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
       parameters.setNoDictSchemaDataType(
           CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
       Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
@@ -627,12 +644,16 @@ public class SortParameters implements Serializable {
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
-    parameters.setSortColumnSchemaOrderMap(
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
     parameters.setNoDictSchemaDataType(
         CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
-    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+    Map<String, int[]> columnIdxMap =
+        CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable());
+    parameters
+        .setNoDictSortColumnSchemaOrderMapping(columnIdxMap.get("columnIdxBasedOnSchemaInRow"));
+    parameters
+        .setNoDictSortColIdxSchemaOrderMapping(columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow"));
+    parameters
+        .setDictSortColIdxSchemaOrderMapping(columnIdxMap.get("dictSortIdxBasedOnSchemaInRow"));
     TableSpec tableSpec = new TableSpec(carbonTable, false);
     parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
     parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
@@ -713,11 +734,19 @@ public class SortParameters implements Serializable {
     this.noDictSchemaDataType = noDictSchemaDataType;
   }
 
-  public void setSortColumnSchemaOrderMap(Map<Integer, List<Boolean>> sortColumnSchemaOrderMap)
{
-    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+  public int[] getNoDictSortColIdxSchemaOrderMapping() {
+    return noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public void setNoDictSortColIdxSchemaOrderMapping(int[] noDictSortColIdxSchemaOrderMapping)
{
+    this.noDictSortColIdxSchemaOrderMapping = noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public int[] getDictSortColIdxSchemaOrderMapping() {
+    return dictSortColIdxSchemaOrderMapping;
   }
 
-  public Map<Integer, List<Boolean>> getSortColumnSchemaOrderMap() {
-    return sortColumnSchemaOrderMap;
+  public void setDictSortColIdxSchemaOrderMapping(int[] dictSortColIdxSchemaOrderMapping)
{
+    this.dictSortColIdxSchemaOrderMapping = dictSortColIdxSchemaOrderMapping;
   }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 84fb4d0..f4cfe69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -105,10 +105,10 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   public SortTempFileChunkHolder(SortParameters sortParameters) {
     this.tableFieldStat = new TableFieldStat(sortParameters);
-    this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictSchemaDataType(),
+    this.comparator = new FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
         tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-        tableFieldStat.getSortColSchemaOrderMap());
+        tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+        tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
@@ -132,10 +132,10 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
                 true));
     this.convertToActualField = convertToActualField;
     if (this.convertToActualField) {
-      this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-          tableFieldStat.getNoDictSchemaDataType(),
+      this.comparator = new FileMergeSortComparator(tableFieldStat.getNoDictSchemaDataType(),
           tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
-          tableFieldStat.getSortColSchemaOrderMap());
+          tableFieldStat.getNoDictSortColIdxSchemaOrderMapping(),
+          tableFieldStat.getDictSortColIdxSchemaOrderMapping());
     } else {
       this.comparator =
           new IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index df7b873..e317d56 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -83,7 +82,15 @@ public class TableFieldStat implements Serializable {
 
   private DataType[] noDictSchemaDataType;
 
-  private Map<Integer, List<Boolean>> sortColSchemaOrderMap;
+  /**
+   * Index of the no dict Sort columns in schema order used for final merge step of sorting.
+   */
+  private int[] noDictSortColIdxSchemaOrderMapping;
+
+  /**
+   * Index of the dict Sort columns in schema order for final merge step of sorting.
+   */
+  private int[] dictSortColIdxSchemaOrderMapping;
 
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
@@ -106,7 +113,9 @@ public class TableFieldStat implements Serializable {
     this.noDictSortDataType = sortParameters.getNoDictSortDataType();
     this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
     this.noDictSchemaDataType = sortParameters.getNoDictSchemaDataType();
-    this.sortColSchemaOrderMap = sortParameters.getSortColumnSchemaOrderMap();
+    this.noDictSortColIdxSchemaOrderMapping =
+        sortParameters.getNoDictSortColIdxSchemaOrderMapping();
+    this.dictSortColIdxSchemaOrderMapping = sortParameters.getDictSortColIdxSchemaOrderMapping();
     for (boolean flag : isVarcharDimFlags) {
       if (flag) {
         varcharDimCnt++;
@@ -359,11 +368,15 @@ public class TableFieldStat implements Serializable {
     return otherCols;
   }
 
-  public Map<Integer, List<Boolean>> getSortColSchemaOrderMap() {
-    return sortColSchemaOrderMap;
-  }
-
   public DataType[] getNoDictSchemaDataType() {
     return noDictSchemaDataType;
   }
+
+  public int[] getNoDictSortColIdxSchemaOrderMapping() {
+    return noDictSortColIdxSchemaOrderMapping;
+  }
+
+  public int[] getDictSortColIdxSchemaOrderMapping() {
+    return dictSortColIdxSchemaOrderMapping;
+  }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index ceaa05c..d8f8673 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -441,38 +441,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * Get the sort/no_sort column map based on schema order.
-   * This will be used in the final sort step to find the index of sort column, to compare
the
-   * intermediate row data based on schema.
-   */
-  public static Map<Integer, List<Boolean>> getSortColSchemaOrderMapping(CarbonTable
carbonTable) {
-    List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
-    Map<Integer, List<Boolean>> sortColSchemaOrderMap = new HashMap<>();
-    for (CarbonDimension dimension : dimensions) {
-      List<Boolean> sortDictOrNoDictMap = new ArrayList<>();
-      // check if the dimension is sort column or not and add to first index of sortDictOrNoDictMap
-      // check if the dimension is dict column or not and add to second index of sortDictOrNoDictMap
-      if (dimension.isSortColumn()) {
-        sortDictOrNoDictMap.add(true);
-        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-          sortDictOrNoDictMap.add(true);
-        } else {
-          sortDictOrNoDictMap.add(false);
-        }
-      } else {
-        sortDictOrNoDictMap.add(false);
-        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-          sortDictOrNoDictMap.add(true);
-        } else {
-          sortDictOrNoDictMap.add(false);
-        }
-      }
-      sortColSchemaOrderMap.put(dimension.getOrdinal(), sortDictOrNoDictMap);
-    }
-    return sortColSchemaOrderMap;
-  }
-
-  /**
    * get mapping based on data fields order
    *
    * @param dataFields
@@ -504,14 +472,26 @@ public final class CarbonDataProcessorUtil {
    * initial sorting, carbonrow will be in order where added sort column is at the beginning,
But
    * before final merger of sort, the data should be in schema order
    * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the carbonRow in
schema
-   * order), so This method helps to find the index of no dictionary sort column in the carbonrow
-   * data.
+   * order), so This method helps to find the index of no dictionary/ dictionary sort column
in
+   * the carbonrow data.
    */
-  public static int[] getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable) {
+  public static Map<String, int[]> getColumnIdxBasedOnSchemaInRow(CarbonTable carbonTable)
{
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
     List<Integer> noDicSortColMap = new ArrayList<>();
+    // get no-dict / dict sort dimensions
+    List<CarbonDimension> noDictSortDimensions = new ArrayList<>();
+    List<CarbonDimension> dictSortDimensions = new ArrayList<>();
+
+    List<Integer> noDictSortColIdx = new ArrayList<>();
+    List<Integer> dictSortColIdx = new ArrayList<>();
+
     int counter = 0;
     for (CarbonDimension dimension : dimensions) {
+      if (dimension.hasEncoding(Encoding.DICTIONARY) || dimension.getDataType() == DataTypes.DATE)
{
+        dictSortDimensions.add(dimension);
+      } else {
+        noDictSortDimensions.add(dimension);
+      }
       if (dimension.getDataType() == DataTypes.DATE) {
         continue;
       }
@@ -520,12 +500,39 @@ public final class CarbonDataProcessorUtil {
       }
       counter++;
     }
+    // add no-Dict sort column index
+    for (int i = 0; i < noDictSortDimensions.size(); i++) {
+      if (noDictSortDimensions.get(i).isSortColumn()) {
+        noDictSortColIdx.add(i);
+      }
+    }
+    // add dict sort column index
+    for (int i = 0; i < dictSortDimensions.size(); i++) {
+      if (dictSortDimensions.get(i).isSortColumn()) {
+        dictSortColIdx.add(i);
+      }
+    }
     Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
     int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
     for (int i = 0; i < mapping.length; i++) {
       columnIdxBasedOnSchemaInRow[i] = mapping[i];
     }
-    return columnIdxBasedOnSchemaInRow;
+    Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+    int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+    for (int i = 0; i < noDictSortIdx.length; i++) {
+      noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+    }
+    Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+    int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+    for (int i = 0; i < dictSortIdx.length; i++) {
+      dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+    }
+
+    Map<String, int[]> dictOrNoDictSortInfoMap = new HashMap<>();
+    dictOrNoDictSortInfoMap.put("columnIdxBasedOnSchemaInRow", columnIdxBasedOnSchemaInRow);
+    dictOrNoDictSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow", noDictSortIdxBasedOnSchemaInRow);
+    dictOrNoDictSortInfoMap.put("dictSortIdxBasedOnSchemaInRow", dictSortIdxBasedOnSchemaInRow);
+    return dictOrNoDictSortInfoMap;
   }
 
   /**
@@ -533,14 +540,27 @@ public final class CarbonDataProcessorUtil {
    * initial sorting, carbonrow will be in order where added sort column is at the beginning,
But
    * before final merger of sort, the data should be in schema order
    * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the carbonRow in
schema
-   * order), so This method helps to find the index of no dictionary sort column in the carbonrow
-   * data.
+   * order), so This method helps to find the index of no dictionary/ dictionary sort column
in
+   * the carbonrow data.
    */
-  public static int[] getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields)
{
+  public static Map<String, int[]> getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(
+      DataField[] dataFields) {
     List<Integer> noDicSortColMap = new ArrayList<>();
     int counter = 0;
+    // get no-dict / dict sort column schema
+    List<CarbonColumn> noDictSortColumns = new ArrayList<>();
+    List<CarbonColumn> dictSortColumns = new ArrayList<>();
+
+    List<Integer> noDictSortColIdx = new ArrayList<>();
+    List<Integer> dictSortColIdx = new ArrayList<>();
     for (DataField dataField : dataFields) {
       if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension())
{
+        if (dataField.getColumn().getColumnSchema().hasEncoding(Encoding.DICTIONARY)
+            || dataField.getColumn().getColumnSchema().getDataType() == DataTypes.DATE) {
+          dictSortColumns.add(dataField.getColumn());
+        } else {
+          noDictSortColumns.add(dataField.getColumn());
+        }
         if (dataField.getColumn().getColumnSchema().getDataType() == DataTypes.DATE) {
           continue;
         }
@@ -551,12 +571,39 @@ public final class CarbonDataProcessorUtil {
         counter++;
       }
     }
+    // add no-Dict sort column index
+    for (int i = 0; i < noDictSortColumns.size(); i++) {
+      if (noDictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+        noDictSortColIdx.add(i);
+      }
+    }
+    // add dict sort column index
+    for (int i = 0; i < dictSortColumns.size(); i++) {
+      if (dictSortColumns.get(i).getColumnSchema().isSortColumn()) {
+        dictSortColIdx.add(i);
+      }
+    }
     Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
     int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
     for (int i = 0; i < mapping.length; i++) {
       columnIdxBasedOnSchemaInRow[i] = mapping[i];
     }
-    return columnIdxBasedOnSchemaInRow;
+    Integer[] noDictSortIdx = noDictSortColIdx.toArray(new Integer[0]);
+    int[] noDictSortIdxBasedOnSchemaInRow = new int[noDictSortIdx.length];
+    for (int i = 0; i < noDictSortIdx.length; i++) {
+      noDictSortIdxBasedOnSchemaInRow[i] = noDictSortIdx[i];
+    }
+    Integer[] dictSortIdx = dictSortColIdx.toArray(new Integer[0]);
+    int[] dictSortIdxBasedOnSchemaInRow = new int[dictSortIdx.length];
+    for (int i = 0; i < dictSortIdx.length; i++) {
+      dictSortIdxBasedOnSchemaInRow[i] = dictSortIdx[i];
+    }
+
+    Map<String, int[]> dictOrNoSortInfoMap = new HashMap<>();
+    dictOrNoSortInfoMap.put("columnIdxBasedOnSchemaInRow", columnIdxBasedOnSchemaInRow);
+    dictOrNoSortInfoMap.put("noDictSortIdxBasedOnSchemaInRow", noDictSortIdxBasedOnSchemaInRow);
+    dictOrNoSortInfoMap.put("dictSortIdxBasedOnSchemaInRow", dictSortIdxBasedOnSchemaInRow);
+    return dictOrNoSortInfoMap;
   }
 
   /**
diff --git a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
index 46c7e7b..f1d4e17 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
@@ -86,25 +86,20 @@ public class FileMergeSortComparatorTest {
         && noDictDataTypes[1].equals(DataTypes.STRING) && noDictDataTypes[2]
         .equals(DataTypes.LONG));
 
-    // test getSortColSchemaOrderMapping
-    Map<Integer, List<Boolean>> sortColSchemaOrderMapping =
-        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable);
-    assert (sortColSchemaOrderMapping.get(0).get(0).equals(false) && sortColSchemaOrderMapping
-        .get(0).get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(1).get(0).equals(true) && sortColSchemaOrderMapping.get(1)
-        .get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(2).get(0).equals(true) && sortColSchemaOrderMapping
-        .get(2).get(1).equals(false));
-    assert (sortColSchemaOrderMapping.get(3).get(0).equals(true) && sortColSchemaOrderMapping.get(3)
-        .get(1).equals(true));
-
     // test comparator
-    boolean[] isSortColNoDict = { true, false };
-    int[] columnIdxBasedOnSchemaInRow =
+    Map<String, int[]> columnIdxMap =
         CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(carbonTable);
+    int[] columnIdxBasedOnSchemaInRows = columnIdxMap.get("columnIdxBasedOnSchemaInRow");
+    int[] noDictSortIdxBasedOnSchemaInRows = columnIdxMap.get("noDictSortIdxBasedOnSchemaInRow");
+    int[] dictSortIdxBasedOnSchemaInRows = columnIdxMap.get("dictSortIdxBasedOnSchemaInRow");
+ 
+    assert (noDictSortIdxBasedOnSchemaInRows.length == 2 && noDictSortIdxBasedOnSchemaInRows[0]
== 1
+        && noDictSortIdxBasedOnSchemaInRows[1] == 2);
+    assert (dictSortIdxBasedOnSchemaInRows.length == 1 && dictSortIdxBasedOnSchemaInRows[0]
== 0);
+
     FileMergeSortComparator comparator =
-        new FileMergeSortComparator(isSortColNoDict, noDictDataTypes, columnIdxBasedOnSchemaInRow,
-            sortColSchemaOrderMapping);
+        new FileMergeSortComparator(noDictDataTypes, columnIdxBasedOnSchemaInRows,
+            noDictSortIdxBasedOnSchemaInRows, dictSortIdxBasedOnSchemaInRows);
 
     // prepare data for final sort
     int[] dictSortDims1 = { 1 };


Mime
View raw message