carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3371] Fix ArrayIndexOutOfBoundsException of compaction after sort_columns modification
Date Thu, 09 May 2019 11:00:17 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 24753a9  [CARBONDATA-3371] Fix ArrayIndexOutOfBoundsException of compaction after
sort_columns modification
24753a9 is described below

commit 24753a924d8b6a4e01dfaff4f57db25626ba636d
Author: QiangCai <qiangcai@qq.com>
AuthorDate: Sun May 5 15:22:23 2019 +0800

    [CARBONDATA-3371] Fix ArrayIndexOutOfBoundsException of compaction after sort_columns
modification
    
    Modification:
    
    SegmentPropertiesWrapper should check the column order for different segments
    Because sort_columns modification can change the column order.
    dictionaryColumnChunkIndex of blockExecutionInfo should keep projection order for compaction
    if column drift happened, it should convert measure to dimension in RawResultIterator
    
    This closes #3201
---
 .../block/SegmentPropertiesAndSchemaHolder.java    |  13 +--
 .../scan/executor/impl/AbstractQueryExecutor.java  |   6 +-
 .../core/scan/executor/util/QueryUtil.java         |   2 +-
 .../iterator/ColumnDriftRawResultIterator.java     | 128 +++++++++++++++++++++
 .../scan/result/iterator/RawResultIterator.java    |  12 +-
 .../core/scan/wrappers/ByteArrayWrapper.java       |   3 +
 .../TestAlterTableSortColumnsProperty.scala        |  92 ++++++++++-----
 .../carbondata/spark/rdd/StreamHandoffRDD.scala    |   2 +-
 .../merger/CarbonCompactionExecutor.java           |  20 +++-
 9 files changed, 225 insertions(+), 53 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 34ce5d0..f2f2d8c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -346,15 +346,9 @@ public class SegmentPropertiesAndSchemaHolder {
       if (obj1 == null || obj2 == null || (obj1.size() != obj2.size())) {
         return false;
       }
-      List<ColumnSchema> clonedObj1 = new ArrayList<>(obj1);
-      List<ColumnSchema> clonedObj2 = new ArrayList<>(obj2);
-      clonedObj1.addAll(obj1);
-      clonedObj2.addAll(obj2);
-      sortList(clonedObj1);
-      sortList(clonedObj2);
       boolean exists = true;
       for (int i = 0; i < obj1.size(); i++) {
-        if (!clonedObj1.get(i).equalsWithStrictCheck(clonedObj2.get(i))) {
+        if (!obj1.get(i).equalsWithStrictCheck(obj2.get(i))) {
           exists = false;
           break;
         }
@@ -372,11 +366,14 @@ public class SegmentPropertiesAndSchemaHolder {
 
     @Override public int hashCode() {
       int allColumnsHashCode = 0;
+      // check column order
+      StringBuilder builder = new StringBuilder();
       for (ColumnSchema columnSchema: columnsInTable) {
         allColumnsHashCode = allColumnsHashCode + columnSchema.strictHashCode();
+        builder.append(columnSchema.getColumnUniqueId()).append(",");
       }
       return carbonTable.getAbsoluteTableIdentifier().hashCode() + allColumnsHashCode + Arrays
-          .hashCode(columnCardinality);
+          .hashCode(columnCardinality) + builder.toString().hashCode();
     }
 
     public AbsoluteTableIdentifier getTableIdentifier() {
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index f06f5c3..6c048f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -605,7 +605,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
     // setting the size of fixed key column (dictionary column)
     blockExecutionInfo
         .setFixedLengthKeySize(getKeySize(projectDimensions, segmentProperties));
-    Set<Integer> dictionaryColumnChunkIndex = new HashSet<Integer>();
+    List<Integer> dictionaryColumnChunkIndex = new ArrayList<Integer>();
     List<Integer> noDictionaryColumnChunkIndex = new ArrayList<Integer>();
     // get the block index to be read from file for query dimension
     // for both dictionary columns and no dictionary columns
@@ -616,7 +616,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         dictionaryColumnChunkIndex.toArray(new Integer[dictionaryColumnChunkIndex.size()]));
     // need to sort the dictionary column as for all dimension
     // column key will be filled based on key order
-    Arrays.sort(queryDictionaryColumnChunkIndexes);
+    if (!queryModel.isForcedDetailRawQuery()) {
+      Arrays.sort(queryDictionaryColumnChunkIndexes);
+    }
     blockExecutionInfo.setDictionaryColumnChunkIndex(queryDictionaryColumnChunkIndexes);
     // setting the no dictionary column block indexes
     blockExecutionInfo.setNoDictionaryColumnChunkIndexes(ArrayUtils.toPrimitive(
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 49157f9..95fbe66 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -509,7 +509,7 @@ public class QueryUtil {
   public static void fillQueryDimensionChunkIndexes(
       List<ProjectionDimension> projectDimensions,
       Map<Integer, Integer> columnOrdinalToChunkIndexMapping,
-      Set<Integer> dictionaryDimensionChunkIndex,
+      List<Integer> dictionaryDimensionChunkIndex,
       List<Integer> noDictionaryDimensionChunkIndex) {
     for (ProjectionDimension queryDimension : projectDimensions) {
       if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ColumnDriftRawResultIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ColumnDriftRawResultIterator.java
new file mode 100644
index 0000000..d3fed76
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ColumnDriftRawResultIterator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.result.RowBatch;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This is a wrapper iterator over the detail raw query iterator.
+ * This iterator will handle the processing of the raw rows.
+ * This will handle the batch results and will iterate on the batches and give single row.
+ */
+public class ColumnDriftRawResultIterator extends RawResultIterator {
+
+  // column reorder for no-dictionary column
+  private int noDictCount;
+  private int[] noDictMap;
+  // column drift
+  private boolean[] isColumnDrift;
+  private int measureCount;
+  private DataType[] measureDataTypes;
+
+  /**
+   * LOGGER
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(ColumnDriftRawResultIterator.class.getName());
+
+  public ColumnDriftRawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
+      SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties)
{
+    super(detailRawQueryResultIterator, sourceSegProperties, destinationSegProperties, false);
+    initForColumnDrift();
+    init();
+  }
+
+  private void initForColumnDrift() {
+    List<CarbonDimension> noDictDims =
+        new ArrayList<>(destinationSegProperties.getDimensions().size());
+    for (CarbonDimension dimension : destinationSegProperties.getDimensions()) {
+      if (dimension.getNumberOfChild() == 0) {
+        if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
+          noDictDims.add(dimension);
+        }
+      }
+    }
+    measureCount = destinationSegProperties.getMeasures().size();
+    noDictCount = noDictDims.size();
+    isColumnDrift = new boolean[noDictCount];
+    noDictMap = new int[noDictCount];
+    measureDataTypes = new DataType[noDictCount];
+    List<CarbonMeasure> sourceMeasures = sourceSegProperties.getMeasures();
+    int tableMeasureCount = sourceMeasures.size();
+    for (int i = 0; i < noDictCount; i++) {
+      for (int j = 0; j < tableMeasureCount; j++) {
+        if (RestructureUtil.isColumnMatches(true, noDictDims.get(i), sourceMeasures.get(j)))
{
+          isColumnDrift[i] = true;
+          measureDataTypes[i] = sourceMeasures.get(j).getDataType();
+          break;
+        }
+      }
+    }
+    int noDictIndex = 0;
+    // the column drift are at the end of measures
+    int measureIndex = measureCount + 1;
+    for (int i = 0; i < noDictCount; i++) {
+      if (isColumnDrift[i]) {
+        noDictMap[i] = measureIndex++;
+      } else {
+        noDictMap[i] = noDictIndex++;
+      }
+    }
+  }
+
+  @Override
+  protected Object[] convertRow(Object[] rawRow) throws KeyGenException {
+    super.convertRow(rawRow);
+    ByteArrayWrapper dimObject = (ByteArrayWrapper) rawRow[0];
+    // need move measure to dimension and return new row by current schema
+    byte[][] noDicts = dimObject.getNoDictionaryKeys();
+    byte[][] newNoDicts = new byte[noDictCount][];
+    for (int i = 0; i < noDictCount; i++) {
+      if (isColumnDrift[i]) {
+        newNoDicts[i] = DataTypeUtil
+            .getBytesDataDataTypeForNoDictionaryColumn(rawRow[noDictMap[i]], measureDataTypes[i]);
+      } else {
+        newNoDicts[i] = noDicts[noDictMap[i]];
+      }
+    }
+    ByteArrayWrapper newWrapper = new ByteArrayWrapper();
+    newWrapper.setDictionaryKey(dimObject.getDictionaryKey());
+    newWrapper.setNoDictionaryKeys(newNoDicts);
+    newWrapper.setComplexTypesKeys(dimObject.getComplexTypesKeys());
+    newWrapper.setImplicitColumnByteArray(dimObject.getImplicitColumnByteArray());
+    Object[] finalRawRow = new Object[1 + measureCount];
+    finalRawRow[0] = newWrapper;
+    System.arraycopy(rawRow, 1, finalRawRow, 1, measureCount);
+    return finalRawRow;
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 1febb0b..4d471b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -41,9 +41,9 @@ import org.apache.log4j.Logger;
  */
 public class RawResultIterator extends CarbonIterator<Object[]> {
 
-  private final SegmentProperties sourceSegProperties;
+  protected final SegmentProperties sourceSegProperties;
 
-  private final SegmentProperties destinationSegProperties;
+  protected final SegmentProperties destinationSegProperties;
   /**
    * Iterator of the Batch raw result.
    */
@@ -66,18 +66,18 @@ public class RawResultIterator extends CarbonIterator<Object[]>
{
 
   public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
       SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties,
-      boolean isStreamingHandoff) {
+      boolean init) {
     this.detailRawQueryResultIterator = detailRawQueryResultIterator;
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
     this.executorService = Executors.newFixedThreadPool(1);
 
-    if (!isStreamingHandoff) {
+    if (init) {
       init();
     }
   }
 
-  private void init() {
+  protected void init() {
     this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
         CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
@@ -193,7 +193,7 @@ public class RawResultIterator extends CarbonIterator<Object[]>
{
     return this.currentRawRow;
   }
 
-  private Object[] convertRow(Object[] rawRow) throws KeyGenException {
+  protected Object[] convertRow(Object[] rawRow) throws KeyGenException {
     byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
     long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
     byte[] covertedBytes =
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 1b903f7..253c21c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -218,4 +218,7 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>,
Serializa
     this.implicitColumnByteArray = implicitColumnByteArray;
   }
 
+  public byte[] getImplicitColumnByteArray() {
+    return implicitColumnByteArray;
+  }
 }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
index bf4bae6..3e23e91 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
@@ -32,7 +32,7 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       "yyyy-MM-dd HH:mm:ss")
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
     dropTable()
     prepareTable()
   }
@@ -359,6 +359,11 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
     loadData(tableName, baseTableName)
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
+    // alter table to change SORT_SCOPE and SORT_COLUMNS
+    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField')")
+    loadData(tableName, baseTableName)
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
+
     // alter table to local_sort with new SORT_COLUMNS
     sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='timestampField,
intField, stringField')")
     loadData(tableName, baseTableName)
@@ -370,40 +375,66 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
     // alter table to change SORT_COLUMNS
-    sql(s"alter table $tableName set tblproperties('sort_columns'='smallIntField, stringField,
intField')")
-    loadData(tableName, baseTableName)
-    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
-
-    // alter table to change SORT_SCOPE and SORT_COLUMNS
-    sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField,
bigIntField, smallIntField')")
+    sql(s"alter table $tableName set tblproperties('sort_columns'='intField, stringField')")
     loadData(tableName, baseTableName)
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
     // alter table to change SORT_SCOPE
-    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField,
bigIntField, smallIntField')")
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField,
smallIntField')")
     loadData(tableName, baseTableName)
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
+    // set input segments
+    (0 to 5).foreach { segment =>
+      sql(s"set carbon.input.segments.default.$tableName=$segment").collect()
+      sql(s"set carbon.input.segments.default.$baseTableName=$segment").collect()
+      checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+      checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
+      checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+      checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"),
sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+      checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >=
2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField
>= 2 order by floatField"))
+    }
+    sql(s"set carbon.input.segments.default.$tableName=*").collect()
+    sql(s"set carbon.input.segments.default.$baseTableName=*").collect()
+
     // query
     checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
     checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
     checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"),
sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
     checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >=
2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField
>= 2 order by floatField"))
 
-    // set input segments
-    (0 to 5).foreach { segment =>
-      sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
-      sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
-      checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
-    }
-    sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
-    sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
-
     // delete
     sql(s"delete from $tableName where smallIntField = 2")
     sql(s"delete from $baseTableName where smallIntField = 2")
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
+    // compaction for column drift
+    sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField,
intField')")
+    // [Segment info]:
+    //   | sorted | dimension order(sort_columns is in [])                              
                      | measure order
+    // -------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+    // 0 | false  | timestampField, dateField, stringField, varcharField, charField     
                      | smallIntField, intField, bigIntField, floatField, doubleField
+    // 1 | true   | [charField], timestampField, dateField, stringField, varcharField   
                      | smallIntField, intField, bigIntField, floatField, doubleField
+    // 2 | false  | [timestampField, intField, stringField], charField, dateField, varcharField
               | smallIntField, bigIntField, floatField, doubleField
+    // 3 | false  | [stringField, intField, timestampField], charField, dateField, varcharField
               | smallIntField, bigIntField, floatField, doubleField
+    // 4 | false  | [intField, stringField], timestampField, charField, dateField, varcharField
               | smallIntField, bigIntField, floatField, doubleField
+    // 5 | true   | [charField, smallIntField], intField, stringField, timestampField, dateField,
varcharField | bigIntField, floatField, doubleField
+    // Column drift happened, intField and smallIntField became dimension.
+    // The order of columns also changed.
+    //
+    // [Table info]:
+    //            | dimension order(sort_columns is in [])                              
                      | measure order
+    // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+    // table      | [charField], smallIntField, intField, stringField, timestampField, dateField,
varcharField | bigIntField, floatField, doubleField
+    sql(s"alter table $tableName compact 'minor'")
+    sql(s"alter table $baseTableName compact 'minor'")
+    checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"),
sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
+    checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >=
2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField
>= 2 order by floatField"))
+
     sql(s"delete from $tableName")
     checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(0)))
     sql(s"delete from $baseTableName")
@@ -426,8 +457,8 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
     // update
-    sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField)
= (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where
smallIntField = 2").show()
-    sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField)
= (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where
smallIntField = 2").show()
+    sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField)
= (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where
smallIntField = 2").collect()
+    sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField)
= (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where
smallIntField = 2").collect()
     checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
 
     // query
@@ -438,21 +469,20 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
 
     // set input segments
     (6 to 11).foreach { segment =>
-      sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false)
-      sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false)
+      sql(s"set carbon.input.segments.default.$tableName=$segment").collect()
+      sql(s"set carbon.input.segments.default.$baseTableName=$segment").collect()
       checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
     }
-    sql(s"set carbon.input.segments.default.$tableName=*").show(100, false)
-    sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false)
+    sql(s"set carbon.input.segments.default.$tableName=*").collect()
+    sql(s"set carbon.input.segments.default.$baseTableName=*").collect()
 
-    // compaction
-    sql(s"show segments for table $tableName").show(100, false)
-    sql(s"show segments for table $baseTableName").show(100, false)
+    // no_sort compaction flow for column drift
+    sql(s"alter table $tableName set tblproperties('sort_scope'='no_sort', 'sort_columns'='charField,
intField')")
+    // sort_scope become no_sort
     sql(s"alter table $tableName compact 'minor'")
     sql(s"alter table $baseTableName compact 'minor'")
-    sql(s"show segments for table $tableName").show(100, false)
-    sql(s"show segments for table $baseTableName").show(100, false)
     checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName"))
+    checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from
$baseTableName order by floatField"))
     checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"),
sql(s"select * from $baseTableName where smallIntField = 2 order by floatField"))
     checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"),
sql(s"select * from $baseTableName where intField >= 2 order by floatField"))
     checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >=
2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField
>= 2 order by floatField"))
@@ -508,6 +538,8 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
   }
 
   test("bloom filter") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
     val tableName = "alter_sc_bloom"
     val dataMapName = "alter_sc_bloom_dm1"
     val baseTableName = "alter_sc_bloom_base"
@@ -523,18 +555,18 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
   }
 
   test("pre-aggregate") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
     val tableName = "alter_sc_agg"
     val dataMapName = "alter_sc_agg_dm1"
     val baseTableName = "alter_sc_agg_base"
     loadData(tableName, baseTableName)
-    sql(s"SHOW DATAMAP ON TABLE $tableName").show(100, false)
     checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "preaggregate", dataMapName)
     checkExistence(sql(s"EXPLAIN select stringField,sum(intField) as sum from $tableName
where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
     checkAnswer(sql(s"select stringField,sum(intField) as sum from $tableName where stringField
= 'abc2' group by stringField"), sql(s"select stringField,sum(intField) as sum from $baseTableName
where stringField = 'abc2' group by stringField"))
 
     sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField,
charField')")
     loadData(tableName, baseTableName)
-    sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField
= 'abc2' group by stringField").show(100, false)
     checkExistence(sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName
where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName)
     checkAnswer(sql(s"select stringField,max(intField) as sum from $tableName where stringField
= 'abc2' group by stringField"), sql(s"select stringField,max(intField) as sum from $baseTableName
where stringField = 'abc2' group by stringField"))
   }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 31417bc..d754781 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -74,7 +74,7 @@ class HandoffPartition(
  */
 class StreamingRawResultIterator(
     recordReader: RecordReader[Void, Any]
-) extends RawResultIterator(null, null, null, true) {
+) extends RawResultIterator(null, null, null, false) {
 
   override def hasNext: Boolean = {
     recordReader.nextKeyValue()
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index d9c7be7..28f1cf4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -37,10 +37,12 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
+import org.apache.carbondata.core.scan.result.iterator.ColumnDriftRawResultIterator;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.IntArrayWrapper;
 import org.apache.carbondata.core.stats.QueryStatistic;
@@ -175,11 +177,19 @@ public class CarbonCompactionExecutor {
   private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId,
       String task, List<TableBlockInfo> tableBlockInfoList)
       throws QueryExecutionException, IOException {
-    return new RawResultIterator(
-        executeBlockList(tableBlockInfoList, segmentId, task, configuration),
-        getSourceSegmentProperties(
-            Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter())),
-        destinationSegProperties, false);
+    SegmentProperties sourceSegmentProperties = getSourceSegmentProperties(
+        Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter()));
+    boolean hasColumnDrift = carbonTable.hasColumnDrift() &&
+        RestructureUtil.hasColumnDriftOnSegment(carbonTable, sourceSegmentProperties);
+    if (hasColumnDrift) {
+      return new ColumnDriftRawResultIterator(
+          executeBlockList(tableBlockInfoList, segmentId, task, configuration),
+          sourceSegmentProperties, destinationSegProperties);
+    } else {
+      return new RawResultIterator(
+          executeBlockList(tableBlockInfoList, segmentId, task, configuration),
+          sourceSegmentProperties, destinationSegProperties, true);
+    }
   }
 
   /**


Mime
View raw message