carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [29/50] [abbrv] carbondata git commit: [CARBONDATA-2019] Enhancement of merge index compaction feature to support creation of merge index file on old store where index file does not contain the blocklet info
Date Sun, 28 Jan 2018 06:45:58 GMT
[CARBONDATA-2019] Enhancement of merge index compaction feature to support creation of merge
index file on old store where index file does not contain the blocklet info

Enhancement of merge index compaction feature to support creation of merge index file on old
store where index file does not contain the blocklet info.
Old store created with carbondata 1.1 version does not contain the blocklet info in the index
file. On that store if merge index file is created then blocklet information will not be present
in the merge index file and for first time query again carbondata file footer will be read
for blocklet information retrieval.
Benefits:

Support merge index file creation on old store
Improve first time query performance.
Note: First time query performance will be improved only if merge index file is created before
running the first query

This closes #1782


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e820006b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e820006b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e820006b

Branch: refs/heads/fgdatamap
Commit: e820006bc5830d096c6d5e4ebeff887e6ca14e21
Parents: 04d40cc
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Tue Jan 9 20:32:36 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jan 16 16:07:07 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    | 128 +++++++++++++++++++
 .../util/AbstractDataFileFooterConverter.java   |  43 +++++--
 .../core/util/CarbonMetadataUtil.java           |   4 +-
 .../core/writer/CarbonIndexFileMergeWriter.java |  43 ++++++-
 .../spark/rdd/CarbonMergeFilesRDD.scala         |   6 +-
 .../carbondata/spark/util/CommonUtil.scala      |  36 ++++--
 .../CarbonAlterTableCompactionCommand.scala     |   7 +-
 7 files changed, 232 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 444a67b..9603090 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -21,11 +21,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
@@ -38,6 +49,11 @@ import org.apache.thrift.TBase;
 public class SegmentIndexFileStore {
 
   /**
+   * Logger constant
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SegmentIndexFileStore.class.getName());
+  /**
    * Stores the indexfile name and related binary file data in it.
    */
   private Map<String, byte[]> carbonIndexMap;
@@ -64,6 +80,23 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * read index file and fill the blocklet information
+   *
+   * @param segmentPath
+   * @throws IOException
+   */
+  public void readAllIndexAndFillBolckletInfo(String segmentPath) throws IOException {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        readMergeFile(carbonIndexFiles[i].getCanonicalPath());
+      } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT))
{
+        readIndexAndFillBlockletInfo(carbonIndexFiles[i]);
+      }
+    }
+  }
+
+  /**
    * Read all index file names of the segment
    *
    * @param segmentPath
@@ -185,4 +218,99 @@ public class SegmentIndexFileStore {
   public Map<String, byte[]> getCarbonIndexMap() {
     return carbonIndexMap;
   }
+
+  /**
+   * This method will read the index information from carbon index file
+   *
+   * @param indexFile
+   * @return
+   * @throws IOException
+   */
+  private void readIndexAndFillBlockletInfo(CarbonFile indexFile) throws IOException {
+    // flag to take decision whether carbondata file footer reading is required.
+    // If the index file does not contain the file footer then carbondata file footer
+    // read is required else not required
+    boolean isCarbonDataFileFooterReadRequired = true;
+    List<BlockletInfo> blockletInfoList = null;
+    List<BlockIndex> blockIndexThrift =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    try {
+      indexReader.openThriftReader(indexFile.getCanonicalPath());
+      // get the index header
+      org.apache.carbondata.format.IndexHeader indexHeader = indexReader.readIndexHeader();
+      DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+      String filePath = indexFile.getCanonicalPath();
+      String parentPath =
+          filePath.substring(0, filePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+      while (indexReader.hasNext()) {
+        BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+        if (blockIndex.isSetBlocklet_info()) {
+          // this case will come in case segment index compaction property is set to false
from the
+          // application and alter table segment index compaction is run manually. In that
case
+          // blocklet info will be present in the index but read carbon data file footer
property
+          // will be true
+          isCarbonDataFileFooterReadRequired = false;
+          break;
+        } else {
+          TableBlockInfo blockInfo =
+              fileFooterConverter.getTableBlockInfo(blockIndex, indexHeader, parentPath);
+          blockletInfoList = getBlockletInfoFromIndexInfo(blockInfo);
+        }
+        // old store which does not have the blocklet info will have 1 count per part file
but in
+        // the current code, the number of entries in the index file is equal to the total
number
+        // of blocklets in all part files for 1 task. So to make it compatible with new structure,
+        // the same entry with different blocklet info need to be repeated
+        for (int i = 0; i < blockletInfoList.size(); i++) {
+          BlockIndex blockIndexReplica = blockIndex.deepCopy();
+          BlockletInfo blockletInfo = blockletInfoList.get(i);
+          blockIndexReplica
+              .setBlock_index(CarbonMetadataUtil.getBlockletIndex(blockletInfo.getBlockletIndex()));
+          blockIndexReplica
+              .setBlocklet_info(CarbonMetadataUtil.getBlocletInfo3(blockletInfo));
+          blockIndexThrift.add(blockIndexReplica);
+        }
+      }
+      // read complete file at once
+      if (!isCarbonDataFileFooterReadRequired) {
+        readIndexFile(indexFile);
+      } else {
+        int totalSize = 0;
+        List<byte[]> blockIndexByteArrayList =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        byte[] indexHeaderBytes = CarbonUtil.getByteArray(indexHeader);
+        totalSize += indexHeaderBytes.length;
+        blockIndexByteArrayList.add(indexHeaderBytes);
+        for (BlockIndex blockIndex : blockIndexThrift) {
+          byte[] indexInfoBytes = CarbonUtil.getByteArray(blockIndex);
+          totalSize += indexInfoBytes.length;
+          blockIndexByteArrayList.add(indexInfoBytes);
+        }
+        ByteBuffer byteBuffer = ByteBuffer.allocate(totalSize);
+        for (byte[] blockIndexBytes : blockIndexByteArrayList) {
+          byteBuffer.put(blockIndexBytes);
+        }
+        carbonIndexMap.put(indexFile.getName(), byteBuffer.array());
+      }
+    } finally {
+      indexReader.closeThriftReader();
+    }
+  }
+
+  /**
+   * This method will read the blocklet info from carbon data file and fill it to index info
+   *
+   * @param blockInfo
+   * @return
+   * @throws IOException
+   */
+  private List<BlockletInfo> getBlockletInfoFromIndexInfo(TableBlockInfo blockInfo)
+      throws IOException {
+    long startTime = System.currentTimeMillis();
+    DataFileFooter carbondataFileFooter = CarbonUtil.readMetadatFile(blockInfo);
+    LOGGER.info(
+        "Time taken to read carbondata file footer to get blocklet info " + blockInfo.getFilePath()
+            + " is " + (System.currentTimeMillis() - startTime));
+    return carbondataFileFooter.getBlockletList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index c5f9685..5ebf4cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -157,25 +157,14 @@ public abstract class AbstractDataFileFooterConverter {
         BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
         blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
         dataFileFooter = new DataFileFooter();
-        TableBlockInfo tableBlockInfo = new TableBlockInfo();
-        tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
-        ColumnarFormatVersion version =
-            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion());
-        tableBlockInfo.setVersion(version);
-        int blockletSize = getBlockletSize(readBlockIndexInfo);
-        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
-        String fileName = readBlockIndexInfo.file_name;
-        // Take only name of file.
-        if (fileName.lastIndexOf("/") > 0) {
-          fileName = fileName.substring(fileName.lastIndexOf("/"));
-        }
-        tableBlockInfo.setFilePath(parentPath + "/" + fileName);
+        TableBlockInfo tableBlockInfo =
+            getTableBlockInfo(readBlockIndexInfo, readIndexHeader, parentPath);
         dataFileFooter.setBlockletIndex(blockletIndex);
         dataFileFooter.setColumnInTable(columnSchemaList);
         dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
         dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
         dataFileFooter.setSegmentInfo(segmentInfo);
-        dataFileFooter.setVersionId(version);
+        dataFileFooter.setVersionId(tableBlockInfo.getVersion());
         if (readBlockIndexInfo.isSetBlocklet_info()) {
           List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
           BlockletInfo blockletInfo = new DataFileFooterConverterV3()
@@ -194,6 +183,32 @@ public abstract class AbstractDataFileFooterConverter {
   }
 
   /**
+   * This method will create a table block info object from index file info
+   *
+   * @param readBlockIndexInfo
+   * @param readIndexHeader
+   * @param parentPath
+   * @return
+   */
+  public TableBlockInfo getTableBlockInfo(BlockIndex readBlockIndexInfo,
+      org.apache.carbondata.format.IndexHeader readIndexHeader, String parentPath) {
+    TableBlockInfo tableBlockInfo = new TableBlockInfo();
+    tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+    ColumnarFormatVersion version =
+        ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion());
+    tableBlockInfo.setVersion(version);
+    int blockletSize = getBlockletSize(readBlockIndexInfo);
+    tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+    String fileName = readBlockIndexInfo.file_name;
+    // Take only name of file.
+    if (fileName.lastIndexOf("/") > 0) {
+      fileName = fileName.substring(fileName.lastIndexOf("/"));
+    }
+    tableBlockInfo.setFilePath(parentPath + "/" + fileName);
+    return tableBlockInfo;
+  }
+
+  /**
    * the methods returns the number of blocklets in a block
    *
    * @param readBlockIndexInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 6579a6f..0ca0df8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -93,7 +93,7 @@ public class CarbonMetadataUtil {
     return footer;
   }
 
-  private static BlockletIndex getBlockletIndex(
+  public static BlockletIndex getBlockletIndex(
       org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex info) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
 
@@ -282,7 +282,7 @@ public class CarbonMetadataUtil {
     return thriftBlockIndexList;
   }
 
-  private static BlockletInfo3 getBlocletInfo3(
+  public static BlockletInfo3 getBlocletInfo3(
       org.apache.carbondata.core.metadata.blocklet.BlockletInfo blockletInfo) {
     List<Long> dimensionChunkOffsets = blockletInfo.getDimensionChunkOffsets();
     dimensionChunkOffsets.addAll(blockletInfo.getMeasureChunkOffsets());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 067d024..01f96ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -41,15 +41,25 @@ public class CarbonIndexFileMergeWriter {
    * @param segmentPath
    * @param indexFileNamesTobeAdded while merging it comsiders only these files.
    *                                If null then consider all
+   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+   *                                         file. This will used in case of upgrade from
version
+   *                                         which do not store the blocklet info to current
version
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(
-      String segmentPath,
-      List<String> indexFileNamesTobeAdded) throws IOException {
+  private void mergeCarbonIndexFilesOfSegment(String segmentPath,
+      List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile)
+      throws IOException {
     CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-      fileStore.readAllIIndexOfSegment(segmentPath);
+      if (readFileFooterFromCarbonDataFile) {
+        // this case will be used in case of upgrade where old store will not have the blocklet
+        // info in the index file and therefore blocklet info need to be read from the file
footer
+        // in the carbondata file
+        fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+      } else {
+        fileStore.readAllIIndexOfSegment(segmentPath);
+      }
       Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
       MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
       MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
@@ -79,11 +89,34 @@ public class CarbonIndexFileMergeWriter {
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
+   *
+   * @param segmentPath
+   * @param indexFileNamesTobeAdded
+   * @throws IOException
+   */
+  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
+      List<String> indexFileNamesTobeAdded) throws IOException {
+    mergeCarbonIndexFilesOfSegment(segmentPath, indexFileNamesTobeAdded, false);
+  }
+
+  /**
+   * Merge all the carbonindex files of segment to a  merged file
    * @param segmentPath
    * @throws IOException
    */
   public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, null);
+    mergeCarbonIndexFilesOfSegment(segmentPath, null, false);
+  }
+
+  /**
+   * Merge all the carbonindex files of segment to a  merged file
+   * @param segmentPath
+   * @param readFileFooterFromCarbonDataFile
+   * @throws IOException
+   */
+  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
+      boolean readFileFooterFromCarbonDataFile) throws IOException {
+    mergeCarbonIndexFilesOfSegment(segmentPath, null, readFileFooterFromCarbonDataFile);
   }
 
   private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
index 87153f7..1087ea7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
@@ -39,7 +39,8 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentPath: String)
 class CarbonMergeFilesRDD(
     sc: SparkContext,
     tablePath: String,
-    segments: Seq[String])
+    segments: Seq[String],
+    readFileFooterFromCarbonDataFile: Boolean)
   extends CarbonRDD[String](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
@@ -53,7 +54,8 @@ class CarbonMergeFilesRDD(
       val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
       logInfo("Merging carbon index files of segment : " + split.segmentPath)
 
-      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(split.segmentPath)
+      new CarbonIndexFileMergeWriter()
+        .mergeCarbonIndexFilesOfSegment(split.segmentPath, readFileFooterFromCarbonDataFile)
 
       var havePair = false
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 3f1f305..d96a051 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -893,16 +893,28 @@ object CommonUtil {
 
   /**
    * Merge the carbonindex files with in the segment to carbonindexmerge file inside same
segment
+   *
+   * @param sparkContext
+   * @param segmentIds
+   * @param tablePath
+   * @param carbonTable
+   * @param mergeIndexProperty
+   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+   *                                         file. This will used in case of upgrade from
version
+   *                                         which do not store the blocklet info to current
version
    */
   def mergeIndexFiles(sparkContext: SparkContext,
       segmentIds: Seq[String],
       tablePath: String,
       carbonTable: CarbonTable,
-      mergeIndexProperty: Boolean): Unit = {
+      mergeIndexProperty: Boolean,
+      readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
     if (mergeIndexProperty) {
-      new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-        carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
-        segmentIds).collect()
+      new CarbonMergeFilesRDD(
+        sparkContext,
+        carbonTable.getTablePath,
+        segmentIds,
+        readFileFooterFromCarbonDataFile).collect()
     } else {
       try {
         CarbonProperties.getInstance()
@@ -910,16 +922,20 @@ object CommonUtil {
         if (CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
           CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
-          new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-            carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
-            segmentIds).collect()
+          new CarbonMergeFilesRDD(
+            sparkContext,
+            carbonTable.getTablePath,
+            segmentIds,
+            readFileFooterFromCarbonDataFile).collect()
         }
       } catch {
         case _: Exception =>
           if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
-            new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
-              carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
-              segmentIds).collect()
+            new CarbonMergeFilesRDD(
+              sparkContext,
+              carbonTable.getTablePath,
+              segmentIds,
+              readFileFooterFromCarbonDataFile).collect()
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e820006b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index b10777d..6af0e98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -220,11 +220,14 @@ case class CarbonAlterTableCompactionCommand(
         try {
           if (compactionType == CompactionType.SEGMENT_INDEX) {
             // Just launch job to merge index and return
-            CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+            CommonUtil.mergeIndexFiles(
+              sqlContext.sparkContext,
               CarbonDataMergerUtil.getValidSegmentList(
                 carbonTable.getAbsoluteTableIdentifier).asScala,
               carbonLoadModel.getTablePath,
-              carbonTable, true)
+              carbonTable,
+              mergeIndexProperty = true,
+              readFileFooterFromCarbonDataFile = true)
 
             val carbonMergerMapping = CarbonMergerMapping(carbonTable.getTablePath,
               carbonTable.getMetaDataFilepath,


Mime
View raw message