carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [29/50] [abbrv] carbondata git commit: [CARBONDATA-2714] Support merge index files for the segment
Date Wed, 18 Jul 2018 02:20:02 GMT
[CARBONDATA-2714] Support merge index files for the segment

Problem :
The first-time query of carbon becomes very slow. It is because of reading many small carbonindex files and cache to the driver at the first time.
Many carbonindex files are created in below case
Loading data in large cluster
For example, if the cluster size is 100 nodes then for each load 100 index files are created per segment. So after 100 loads, the number of carbonindex files becomes 10000. .
It will be slower to read all the files from the driver since a lot of namenode calls and IO operations.
Solution :
Merge the carbonindex files in two levels.so that we can reduce the IO calls to namenode and improves the read performance.
Merge within a segment.
Merge the carbonindex files to single file immediately after load completes within the segment. It would be named as a .carbonindexmerge file. It is actually not a true data merging but a simple file merge. So that the current structure of carbonindex files does not change. While reading we just read one file instead of many carbonindex files within the segment.

This closes #2482


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/73419071
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/73419071
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/73419071

Branch: refs/heads/carbonstore
Commit: 73419071a308085be73c4a98fda57be241299fba
Parents: 6c5abdd
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Tue Jul 10 20:00:41 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon Jul 16 17:50:15 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 +-
 .../indexstore/BlockletDataMapIndexStore.java   |  12 +-
 .../core/metadata/SegmentFileStore.java         |  63 +++-
 .../core/util/BlockletDataMapUtil.java          |   2 +-
 .../sdv/generated/MergeIndexTestCase.scala      |  24 +-
 .../CarbonIndexFileMergeTestCase.scala          | 353 +++++++++++++++----
 .../dataload/TestGlobalSortDataLoad.scala       |   4 +-
 .../StandardPartitionTableCleanTestCase.scala   |   2 +-
 .../StandardPartitionTableLoadingTestCase.scala |   5 +
 .../carbondata/events/AlterTableEvents.scala    |  11 +
 .../carbondata/spark/util/CommonUtil.scala      |  59 +++-
 .../apache/spark/rdd/CarbonMergeFilesRDD.scala  | 100 ++++++
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  15 +-
 .../sql/events/MergeIndexEventListener.scala    | 180 ++++++++++
 .../CarbonAlterTableCompactionCommand.scala     |  23 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   1 -
 .../partition/TestAlterPartitionTable.scala     |  24 +-
 .../TestStreamingTableWithRowParser.scala       |  50 ---
 .../CarbonGetTableDetailComandTestCase.scala    |   4 +-
 .../processing/merger/CompactionType.java       |   1 +
 20 files changed, 751 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 3e2843c..e7e074d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1394,9 +1394,6 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SQLASTBUILDER_CLASSNAME =
       "spark.carbon.sqlastbuilder.classname";
 
-  public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
-      "spark.carbon.common.listener.register.classname";
-
   @CarbonProperty
   public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
       "carbon.lease.recovery.retry.count";
@@ -1871,6 +1868,16 @@ public final class CarbonCommonConstants {
    */
   public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK";
 
+  /**
+   * It is internal configuration and used only for test purpose.
+   * It will merge the carbon index files with in the segment to single segment.
+   */
+  @CarbonProperty
+  public static final String CARBON_MERGE_INDEX_IN_SEGMENT =
+      "carbon.merge.index.in.segment";
+
+  public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index d84f977..3918f3e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -106,11 +106,13 @@ public class BlockletDataMapIndexStore
                 new TableBlockIndexUniqueIdentifierWrapper(blockIndexUniqueIdentifier,
                     identifierWrapper.getCarbonTable()), indexFileStore, filesRead,
                 carbonDataFileBlockMetaInfoMapping);
-            BlockDataMap blockletDataMap =
-                loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
-                    identifierWrapper.getCarbonTable(),
-                    identifierWrapper.isAddTableBlockToUnsafe());
-            dataMaps.add(blockletDataMap);
+            if (!blockMetaInfoMap.isEmpty()) {
+              BlockDataMap blockletDataMap =
+                  loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
+                      identifierWrapper.getCarbonTable(),
+                      identifierWrapper.isAddTableBlockToUnsafe());
+              dataMaps.add(blockletDataMap);
+            }
           }
           blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index ce79e65..3d08a2d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -375,9 +375,6 @@ public class SegmentFileStore {
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
         for (CarbonFile file : listFiles) {
           if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-            List<String> indexFiles =
-                new SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath());
-            folderDetails.getFiles().addAll(indexFiles);
             folderDetails.setMergeFileName(file.getName());
           } else {
             folderDetails.getFiles().add(file.getName());
@@ -470,10 +467,12 @@ public class SegmentFileStore {
    * @param ignoreStatus
    * @throws IOException
    */
-  private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+  private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus)
+      throws IOException {
     if (indexFilesMap != null) {
-      return;
+      return new ArrayList<>();
     }
+    List<String> indexOrMergeFiles = new ArrayList<>();
     SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
     indexFilesMap = new HashMap<>();
     indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
@@ -487,7 +486,22 @@ public class SegmentFileStore {
         blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
       }
       indexFilesMap.put(entry.getKey(), blocks);
+      boolean added = false;
+      for (Map.Entry<String, List<String>> mergeFile : indexFileStore
+          .getCarbonMergeFileToIndexFilesMap().entrySet()) {
+        if (mergeFile.getValue().contains(entry.getKey()
+            .substring(entry.getKey().lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1,
+                entry.getKey().length()))) {
+          indexOrMergeFiles.add(mergeFile.getKey());
+          added = true;
+          break;
+        }
+      }
+      if (!added) {
+        indexOrMergeFiles.add(entry.getKey());
+      }
     }
+    return indexOrMergeFiles;
   }
 
   /**
@@ -693,11 +707,13 @@ public class SegmentFileStore {
         // take the list of files from this segment.
         SegmentFileStore fileStore =
             new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
-        fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+        List<String> indexOrMergeFiles =
+            fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
         if (forceDelete) {
           deletePhysicalPartition(
               partitionSpecs,
               fileStore.getIndexFilesMap(),
+              indexOrMergeFiles,
               table.getTablePath());
         }
         for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
@@ -707,11 +723,25 @@ public class SegmentFileStore {
               .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
                   indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
           if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
-            toBeDeletedIndexFiles.add(indexFile);
             // Add the corresponding carbondata files to the delete list.
             toBeDeletedDataFiles.addAll(entry.getValue());
           }
         }
+        for (String indexFile : indexOrMergeFiles) {
+          Long fileTimestamp = 0L;
+          if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+            fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+                .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                    indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+          } else if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+            fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+                .substring(indexFile.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+                    indexFile.length() - CarbonTablePath.MERGE_INDEX_FILE_EXT.length()));
+          }
+          if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+            toBeDeletedIndexFiles.add(indexFile);
+          }
+        }
         if (toBeDeletedIndexFiles.size() > 0) {
           for (String dataFile : toBeDeletedIndexFiles) {
             FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
@@ -734,7 +764,7 @@ public class SegmentFileStore {
   public static void deleteSegment(String tablePath, String segmentFile,
       List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
-    fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
@@ -742,7 +772,7 @@ public class SegmentFileStore {
         FileFactory.deleteFile(file, FileFactory.getFileType(file));
       }
     }
-    deletePhysicalPartition(partitionSpecs, indexFilesMap, tablePath);
+    deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, tablePath);
     String segmentFilePath =
         CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
             + segmentFile;
@@ -757,7 +787,20 @@ public class SegmentFileStore {
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, String tablePath) throws IOException {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath)
+      throws IOException {
+    for (String indexOrMergFile : indexOrMergeFiles) {
+      if (null != partitionSpecs) {
+        Path location = new Path(indexOrMergFile);
+        boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+        if (!exists) {
+          FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+        }
+      } else {
+        Path location = new Path(indexOrMergFile);
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+      }
+    }
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       if (partitionSpecs != null) {
         Path location = new Path(entry.getKey());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 8fa48a8..58df0f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -112,7 +112,7 @@ public class BlockletDataMapUtil {
         CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo());
       }
       String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (null == blockMetaInfoMap.get(blockPath)) {
+      if (null == blockMetaInfoMap.get(blockPath) && FileFactory.isFileExist(blockPath)) {
         blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 215ad0d..5ab88bc 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -23,11 +23,13 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.common.util._
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -39,30 +41,37 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
 
 
   override protected def afterAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
   }
 
   test("Verify correctness of index merge sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS carbon_automation_merge")
     sql(s"""create table carbon_automation_merge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADP
 artitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
       sql("""Select count(*) from carbon_automation_merge"""))
   }
 
   test("Verify command of index merge  sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -72,15 +81,16 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") >= 1)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    sql("alter table carbon_automation_nonmerge compact 'SEGMENT_INDEX'")
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
   }
 
   test("Verify index index merge with compaction  sdv") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -93,9 +103,9 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") >= 1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") >= 1)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") >= 1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1", true) >= 1)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 5871262..8ee2275 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,22 +17,19 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
-import org.junit.Assert
-
-import scala.collection.JavaConverters._
-
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
 class CarbonIndexFileMergeTestCase
   extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -47,9 +44,14 @@ class CarbonIndexFileMergeTestCase
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
   }
 
   test("Verify correctness of index merge") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -60,6 +62,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS indexmerge")
     sql(
       """
@@ -69,15 +73,14 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
   }
 
   test("Verify command of index merge") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -92,17 +95,17 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   test("Verify command of index merge without enabling property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -117,17 +120,16 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
-  test("Verify index index merge with compaction") {
+  test("Verify index merge with compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -139,21 +141,23 @@ class CarbonIndexFileMergeTestCase
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
-  test("Verify index index merge for compacted segments") {
+  test("Verify index merge for compacted segments") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -165,41 +169,71 @@ class CarbonIndexFileMergeTestCase
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
-        s"'GLOBAL_SORT_PARTITIONS'='100')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
   test("Query should not fail after iud operation on a table having merge indexes") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("drop table if exists mitable")
     sql("create table mitable(id int, issue date) stored by 'carbondata'")
     sql("insert into table mitable select '1','2000-02-01'")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", "mitable")
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
     sql("update mitable set(id)=(2) where issue = '2000-02-01'").show()
     sql("clean files for table mitable")
     sql("select * from mitable").show()
+}
+
+  test("Verify index merge for compacted segments MINOR") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   // CARBONDATA-2704, test the index file size after merge
   test("Verify the size of the index file after merge") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS fileSize")
     sql(
       """
@@ -215,43 +249,215 @@ class CarbonIndexFileMergeTestCase
     Assert
       .assertEquals(getIndexOrMergeIndexFileSize(table, "0", CarbonTablePath.INDEX_FILE_EXT),
         segment0.head.getIndexSize.toLong)
-    new CarbonIndexFileMergeWriter(table)
-      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false, String.valueOf(System.currentTimeMillis()))
+    sql("Alter table fileSize compact 'segment_index'")
     loadMetadataDetails = SegmentStatusManager
       .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath))
     segment0 = loadMetadataDetails.filter(x=> x.getLoadName.equalsIgnoreCase("0"))
     Assert
       .assertEquals(getIndexOrMergeIndexFileSize(table, "0", CarbonTablePath.MERGE_INDEX_FILE_EXT),
         segment0.head.getIndexSize.toLong)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS fileSize")
   }
 
-  private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
-    if (FileFactory.isFileExist(segmentDir)) {
-      val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
-      indexFiles.asScala.map { f =>
-        if (f._2 == null) {
-          1
-        } else {
-          0
-        }
-      }.sum
-    } else {
-      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
-      if (segment != null) {
-        val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
-          if (f.getMergeFileName == null) {
-            f.getFiles.size()
-          } else {
-            0
-          }
-        }.sum
-      } else {
-        0
+  test("Verify index merge for compacted segments MINOR - level 2") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+  }
+
+  test("Verify index merge for compacted segments Auto Compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,3")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')"
+    )
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(3000000)))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+  test("Verify index merge for compacted segments Auto Compaction - level 2") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+    s"'GLOBAL_SORT_PARTITIONS'='100')"
+    )
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(3000000)))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+
+  test("Verify index merge for partition table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS partitionTable")
+    sql(
+      """
+        | CREATE TABLE partitionTable(id INT, name STRING, city STRING)
+        | PARTITIONED BY(age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE partitionTable OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_partitionTable", "0") == 0)
+    sql("DROP TABLE IF EXISTS partitionTable")
+  }
+
+  test("Verify index merge for pre-aggregate table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS preAggTable")
+    sql(
+      """
+        | CREATE TABLE preAggTable(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE preAggTable OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_preAggTable", "0") == 0)
+    sql("create datamap preAggSum on table preAggTable using 'preaggregate' as " +
+        "select city,sum(age) as sum from preAggTable group by city")
+    assert(getIndexFileCount("default_preAggTable_preAggSum", "0") == 0)
+    sql("DROP TABLE IF EXISTS partitionTable")
+  }
+
+  test("Verify index merge for streaming table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("DROP TABLE IF EXISTS streamingTable")
+    sql(
+      """
+        | CREATE TABLE streamingTable(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'streaming'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
+    assert(getIndexFileCount("default_streamingTable", "0") >= 1)
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table streamingTable compact 'segment_index'")
+    }.getMessage
+    assert(exceptionMessage.contains("Unsupported alter operation on carbon table: Merge index is not supported on streaming table"))
+    sql("DROP TABLE IF EXISTS streamingTable")
+  }
+
+  private def getIndexFileCount(tableName: String, segment: String): Int = {
+    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
+    val path = CarbonTablePath
+      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath
+          .INDEX_FILE_EXT)
       }
+    })
+    if (carbonFiles != null) {
+      carbonFiles.length
+    } else {
+      0
     }
   }
 
@@ -271,5 +477,4 @@ class CarbonIndexFileMergeTestCase
     })
     size
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index c40526d..0c42264 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -277,13 +277,13 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
     if (FileFactory.isFileExist(segmentDir)) {
-      assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+      assertResult(Math.max(4, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
     } else {
       val segment = Segment.getSegment("0", carbonTable.getTablePath)
       val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
       store.readIndexFiles()
       val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
-      assertResult(Math.max(7, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
+      assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index cfc6983..319953c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -60,7 +60,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
     val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
     val segLoad = details.find(_.getLoadName.equals(segmentId)).get
     val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
-    assert(seg.getIndexFiles.size == indexes)
+    assert(seg.getIndexOrMergeFiles.size == indexes)
   }
 
   test("clean up partition table for int partition column") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 1db1f4a..b61583e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -337,6 +337,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
   }
 
   test("merge carbon index disable data loading for partition table for three partition column") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(
       """
         | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp,
@@ -354,6 +356,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     store.readIndexFiles()
     store.getIndexFiles
     assert(store.getIndexFiles.size() == 10)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
   }
 
   test("load static partition table for one static partition column with load syntax issue") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 37abd73..1810df6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -204,6 +204,17 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
     alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
 
 /**
+ * Compaction Event for handling merge index in alter DDL
+ *
+ * @param sparkSession
+ * @param carbonTable
+ * @param alterTableModel
+ */
+case class AlterTableMergeIndexEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
+
+/**
  * pre event for standard hive partition
  * @param sparkSession
  * @param carbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 39530f4..1cd4d77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -29,6 +29,7 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.{Row, RowFactory}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -53,10 +54,8 @@ import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 
 
 object CommonUtil {
@@ -827,4 +826,60 @@ object CommonUtil {
     }
   }
 
+  /**
+   * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
+   *
+   * @param sparkContext
+   * @param segmentIds
+   * @param tablePath
+   * @param carbonTable
+   * @param mergeIndexProperty
+   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+   *                                         file. This will used in case of upgrade from version
+   *                                         which do not store the blocklet info to current
+   *                                         version
+   */
+  def mergeIndexFiles(sparkContext: SparkContext,
+    segmentIds: Seq[String],
+    segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+    tablePath: String,
+    carbonTable: CarbonTable,
+    mergeIndexProperty: Boolean,
+    readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
+    if (mergeIndexProperty) {
+      new CarbonMergeFilesRDD(
+        sparkContext,
+        carbonTable,
+        segmentIds,
+        segmentFileNameToSegmentIdMap,
+        carbonTable.isHivePartitionTable,
+        readFileFooterFromCarbonDataFile).collect()
+    } else {
+      try {
+        if (CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+          new CarbonMergeFilesRDD(
+            sparkContext,
+            carbonTable,
+            segmentIds,
+            segmentFileNameToSegmentIdMap,
+            carbonTable.isHivePartitionTable,
+            readFileFooterFromCarbonDataFile).collect()
+        }
+      } catch {
+        case _: Exception =>
+          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+            new CarbonMergeFilesRDD(
+              sparkContext,
+              carbonTable,
+              segmentIds,
+              segmentFileNameToSegmentIdMap,
+              carbonTable.isHivePartitionTable,
+              readFileFooterFromCarbonDataFile).collect()
+          }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
new file mode 100644
index 0000000..1acdf7e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
+  extends Partition {
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
+ * @param sc
+ * @param carbonTable
+ * @param segments segments to be merged
+ */
+class CarbonMergeFilesRDD(
+  sc: SparkContext,
+  carbonTable: CarbonTable,
+  segments: Seq[String],
+  segmentFileNameToSegmentIdMap: java.util.Map[String, String],
+  isHivePartitionedTable: Boolean,
+  readFileFooterFromCarbonDataFile: Boolean)
+  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
+
+  override def getPartitions: Array[Partition] = {
+    segments.zipWithIndex.map {s =>
+      CarbonMergeFilePartition(id, s._2, s._1)
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
+    val tablePath = carbonTable.getTablePath
+    val iter = new Iterator[String] {
+      val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
+      logInfo("Merging carbon index files of segment : " +
+              CarbonTablePath.getSegmentPath(tablePath, split.segmentId))
+
+      if (isHivePartitionedTable) {
+        CarbonLoaderUtil
+          .mergeIndexFilesinPartitionedSegment(carbonTable, split.segmentId,
+            segmentFileNameToSegmentIdMap.get(split.segmentId))
+      } else {
+        new CarbonIndexFileMergeWriter(carbonTable)
+          .mergeCarbonIndexFilesOfSegment(split.segmentId,
+            tablePath,
+            readFileFooterFromCarbonDataFile,
+            segmentFileNameToSegmentIdMap.get(split.segmentId))
+      }
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+
+    }
+    iter
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 70c4f12..074568d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -25,6 +25,7 @@ import scala.util.Try
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.events.MergeIndexEventListener
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -39,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -151,7 +152,6 @@ object CarbonEnv {
    */
   def init(sparkSession: SparkSession): Unit = {
     initListeners
-    registerCommonListener(sparkSession)
   }
 
   /**
@@ -181,14 +181,9 @@ object CarbonEnv {
       .addListener(classOf[AlterTableDropPartitionPostStatusEvent],
         AlterTableDropPartitionPostStatusListener)
       .addListener(classOf[AlterTableDropPartitionMetaEvent], AlterTableDropPartitionMetaListener)
-  }
-
-  def registerCommonListener(sparkSession: SparkSession): Unit = {
-    val clsName = Try(sparkSession.sparkContext.conf
-      .get(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME)).toOption.getOrElse("")
-    if (null != clsName && !clsName.isEmpty) {
-      CarbonReflectionUtils.createObject(clsName)
-    }
+      .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
+      .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
+      .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
new file mode 100644
index 0000000..a58e405
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.events
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.CarbonException
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener}
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.util.CommonUtil
+
+class MergeIndexEventListener extends OperationEventListener with Logging {
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
+        LOGGER.audit("Load post status event-listener called for merge index")
+        val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
+        val carbonTableIdentifier = loadTablePreStatusUpdateEvent.getCarbonTableIdentifier
+        val loadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
+        val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
+        val compactedSegments = loadModel.getMergedSegmentIds
+        val sparkSession = SparkSession.getActiveSession.get
+        if(!carbonTable.isStreamingSink) {
+          if (null != compactedSegments && !compactedSegments.isEmpty) {
+            mergeIndexFilesForCompactedSegments(sparkSession.sparkContext,
+              carbonTable,
+              compactedSegments)
+          } else {
+            val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
+              String]()
+
+            segmentFileNameMap
+              .put(loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp))
+            CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+              Seq(loadModel.getSegmentId),
+              segmentFileNameMap,
+              carbonTable.getTablePath,
+              carbonTable, false)
+          }
+        }
+      case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
+        LOGGER.audit("Merge index for compaction called")
+        val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
+        val carbonTable = alterTableCompactionPostEvent.carbonTable
+        val mergedLoads = alterTableCompactionPostEvent.compactedLoads
+        val sparkContext = alterTableCompactionPostEvent.sparkSession.sparkContext
+        if(!carbonTable.isStreamingSink) {
+          mergeIndexFilesForCompactedSegments(sparkContext, carbonTable, mergedLoads)
+        }
+      case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
+        val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent]
+        val alterTableModel = exceptionEvent.alterTableModel
+        val carbonMainTable = exceptionEvent.carbonTable
+        val compactionType = alterTableModel.compactionType
+        val sparkSession = exceptionEvent.sparkSession
+        if (!carbonMainTable.isStreamingSink) {
+          LOGGER.audit(s"Compaction request received for table " +
+                       s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+          LOGGER.info(s"Merge Index request received for table " +
+                      s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
+          val lock = CarbonLockFactory.getCarbonLockObj(
+            carbonMainTable.getAbsoluteTableIdentifier,
+            LockUsage.COMPACTION_LOCK)
+
+          try {
+            if (lock.lockWithRetries()) {
+              LOGGER.info("Acquired the compaction lock for table" +
+                          s" ${ carbonMainTable.getDatabaseName }.${
+                            carbonMainTable
+                              .getTableName
+                          }")
+              val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
+                carbonMainTable.getAbsoluteTableIdentifier).asScala
+              val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
+              validSegments.foreach { segment =>
+                validSegmentIds += segment.getSegmentNo
+              }
+              val loadFolderDetailsArray = SegmentStatusManager
+                .readLoadMetadata(carbonMainTable.getMetadataPath)
+              val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String,
+                String]()
+              loadFolderDetailsArray.foreach(loadMetadataDetails => {
+                segmentFileNameMap
+                  .put(loadMetadataDetails.getLoadName, loadMetadataDetails.getSegmentFile)
+              })
+              CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
+                validSegmentIds,
+                segmentFileNameMap,
+                carbonMainTable.getTablePath,
+                carbonMainTable,
+                true)
+              val requestMessage = "Compaction request completed for table "
+              s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
+              LOGGER.audit(requestMessage)
+              LOGGER.info(requestMessage)
+            } else {
+              val lockMessage = "Not able to acquire the compaction lock for table " +
+                                s"${ carbonMainTable.getDatabaseName }.${
+                                  carbonMainTable
+                                    .getTableName
+                                }"
+
+              LOGGER.audit(lockMessage)
+              LOGGER.error(lockMessage)
+              CarbonException.analysisException(
+                "Table is already locked for compaction. Please try after some time.")
+            }
+          } finally {
+            lock.unlock()
+          }
+        }
+    }
+  }
+
+  def mergeIndexFilesForCompactedSegments(sparkContext: SparkContext,
+    carbonTable: CarbonTable,
+    mergedLoads: util.List[String]): Unit = {
+    // get only the valid segments of the table
+    val validSegments: mutable.Buffer[Segment] = CarbonDataMergerUtil.getValidSegmentList(
+      carbonTable.getAbsoluteTableIdentifier).asScala
+    val mergedSegmentIds = new util.ArrayList[String]()
+    mergedLoads.asScala.foreach(mergedLoad => {
+      val loadName = mergedLoad
+        .substring(mergedLoad.indexOf(CarbonCommonConstants.LOAD_FOLDER) +
+                   CarbonCommonConstants.LOAD_FOLDER.length)
+      mergedSegmentIds.add(loadName)
+    })
+    val loadFolderDetailsArray = SegmentStatusManager
+      .readLoadMetadata(carbonTable.getMetadataPath)
+    val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, String]()
+    loadFolderDetailsArray.foreach(loadMetadataDetails => {
+      segmentFileNameMap.put(loadMetadataDetails.getLoadName, loadMetadataDetails.getSegmentFile)
+    })
+    // filter out only the valid segments from the list of compacted segments
+    // Example: say compacted segments list contains 0.1, 3.1, 6.1, 0.2.
+    // In this list 0.1, 3.1 and 6.1 are compacted to 0.2 in the level 2 compaction.
+    // So, it is enough to do merge index only for 0.2 as it is the only valid segment in this list
+    val validMergedSegIds = validSegments
+      .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
+    if (null != validMergedSegIds && !mergedSegmentIds.isEmpty) {
+      CommonUtil.mergeIndexFiles(sparkContext,
+          validMergedSegIds,
+          segmentFileNameMap,
+          carbonTable.getTablePath,
+          carbonTable,
+          false)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index d467017..a4e52c3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -102,23 +102,24 @@ case class CarbonAlterTableCompactionCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(table)) {
       throw new ConcurrentOperationException(table, "insert overwrite", "compaction")
     }
-    operationContext.setProperty("compactionException", "true")
     var compactionType: CompactionType = null
-    var compactionException = "true"
     try {
       compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     } catch {
       case _: Exception =>
-        val alterTableCompactionExceptionEvent: AlterTableCompactionAbortEvent =
-          AlterTableCompactionAbortEvent(sparkSession, table, alterTableModel)
-        OperationListenerBus.getInstance
-          .fireEvent(alterTableCompactionExceptionEvent, operationContext)
-        compactionException = operationContext.getProperty("compactionException").toString
+        throw new MalformedCarbonCommandException(
+          "Unsupported alter operation on carbon table")
     }
-    if (compactionException.equalsIgnoreCase("true") && null == compactionType) {
-      throw new MalformedCarbonCommandException(
-        "Unsupported alter operation on carbon table")
-    } else if (compactionException.equalsIgnoreCase("false")) {
+    if (compactionType == CompactionType.SEGMENT_INDEX) {
+      if (table.isStreamingSink) {
+        throw new MalformedCarbonCommandException(
+          "Unsupported alter operation on carbon table: Merge index is not supported on streaming" +
+          " table")
+      }
+      val alterTableMergeIndexEvent: AlterTableMergeIndexEvent =
+        AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
+      OperationListenerBus.getInstance
+        .fireEvent(alterTableMergeIndexEvent, operationContext)
       Seq.empty
     } else {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index d30e96d..b341d6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -67,7 +67,6 @@ object Spark2TestQueryExecutor {
     .enableHiveSupport()
     .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
-    .config(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME, "")
     .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
   if (warehouse.startsWith("hdfs://")) {
     System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/73419071/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 7cee409..6bfeb06 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -859,22 +859,14 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] = {
     val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
-    if (segment.getSegmentFileName != null) {
-      val sfs = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
-      sfs.readIndexFiles()
-      val indexFilesMap = sfs.getIndexFilesMap
-      val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new Path(f).getName)
-      dataFiles.toArray
-    } else {
-      val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-      val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-      val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        override def accept(file: CarbonFile): Boolean = {
-          return file.getName.endsWith(".carbondata")
-        }
-      })
-      dataFiles.map(_.getName)
-    }
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".carbondata")
+      }
+    })
+    dataFiles.map(_.getName)
   }
 
   /**


Mime
View raw message