carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [carbondata] 28/41: [CARBONDATA-3321] Improved Single/Concurrent query Performance
Date Tue, 02 Apr 2019 02:41:48 GMT
This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit fbea5c64899365bc1e0bef87abe939884c9a7ed8
Author: kumarvishal09 <kumarvishal1802@gmail.com>
AuthorDate: Tue Mar 19 19:22:30 2019 +0530

    [CARBONDATA-3321] Improved Single/Concurrent query Performance
    
    What changes were proposed in this pull request?
    Problem
    Single/Concurrent query is slow when number of segment is more because of below root cause:
    
    Memory footprint is more because of this gc is more and reducing query performance
    Converting to Unsafe data map row to safe data map during pruning
    Multi threaded pruning in case of non filter query is not supported
    Retrieval from unsafe data map row is slower
    Solution
    
    Reduce memory footprint during query
    Number of object created during query execution was high because of this GC was more and impacting query performance
    Reduced memory footprint of temporary objects.
    a) Added lazy decoding of data map row
    b) Remove convetToSafe, this was used for converting UnsafeDataMapRow to DataMapRowImpl for
    faster retrieval. Changed the Unsafe datamap row format for faster retrieval
    C) Reduced Unnecessary string object creation
    
    Added Multi thread pruning in case of non filter query
    When number of segments/blocks are more pruning is slow for non filter query. As multi threaded
    pruning in case of filter query is already supported added same for non filter query
    
    Changed UnsafeDmStore storage format for faster retrieval.
    Earlier only sequential access was allowed on UnsafeDataMapRow because of this converting
    unsafedatamaprow to blocklet was slow and impacting query performance.
    Changed format in UnsafeDataMapRow so random access can be done for faster retrieval
    
    How was this patch tested?
    Tested in 17 Node cluster with 5K/10K segments
    
    This closes #3154
---
 .../core/datamap/DataMapStoreManager.java          |  11 +-
 .../core/datamap/DistributableDataMapFormat.java   |   1 +
 .../apache/carbondata/core/datamap/Segment.java    |  24 +-
 .../carbondata/core/datamap/TableDataMap.java      |  58 ++--
 .../core/datastore/impl/FileFactory.java           |  43 ++-
 .../core/indexstore/ExtendedBlocklet.java          |  97 ++++--
 .../core/indexstore/SegmentPropertiesFetcher.java  |   3 +
 .../TableBlockIndexUniqueIdentifier.java           |   5 +-
 .../core/indexstore/UnsafeMemoryDMStore.java       | 161 +++++++---
 .../indexstore/blockletindex/BlockDataMap.java     |  88 +++---
 .../indexstore/blockletindex/BlockletDataMap.java  |  25 +-
 .../blockletindex/BlockletDataMapFactory.java      |   9 +-
 .../carbondata/core/indexstore/row/DataMapRow.java |  12 +-
 .../core/indexstore/row/UnsafeDataMapRow.java      | 217 ++-----------
 .../core/indexstore/schema/CarbonRowSchema.java    |   8 +
 .../core/indexstore/schema/SchemaGenerator.java    |  70 ++++
 .../carbondata/core/scan/model/QueryModel.java     |  30 --
 .../apache/carbondata/hadoop/CarbonInputSplit.java | 352 +++++++++++++++------
 .../hadoop/internal/ObjectArrayWritable.java       |   0
 .../carbondata/hadoop/internal/index/Block.java    |   0
 .../carbondata/hadoop/CarbonMultiBlockSplit.java   |  17 +-
 .../carbondata/hadoop/CarbonRecordReader.java      |   4 +-
 .../hadoop/api/CarbonFileInputFormat.java          |   6 +-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  55 +---
 .../hadoop/api/CarbonTableInputFormat.java         |  51 +--
 .../hadoop/util/CarbonVectorizedRecordReader.java  |   2 +-
 .../presto/impl/CarbonLocalInputSplit.java         |   4 +-
 .../org/apache/carbondata/spark/util/Util.java     |   3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |   4 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  11 +-
 .../datasources/SparkCarbonFileFormat.scala        |   6 +-
 .../BloomCoarseGrainDataMapFunctionSuite.scala     |   2 +-
 32 files changed, 755 insertions(+), 624 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 085d98a..524d8b0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -473,6 +473,15 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
+    clearDataMaps(identifier, true);
+  }
+
+  /**
+   * Clear the datamap/datamaps of a table from memory
+   *
+   * @param identifier Table identifier
+   */
+  public void clearDataMaps(AbsoluteTableIdentifier identifier, boolean launchJob) {
     CarbonTable carbonTable = getCarbonTable(identifier);
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
@@ -483,7 +492,7 @@ public final class DataMapStoreManager {
         tableIndices = allDataMaps.get(tableUniqueName);
       }
     }
-    if (null != carbonTable && tableIndices != null) {
+    if (null != carbonTable && tableIndices != null && launchJob) {
       try {
         DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
       } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 007541d..4c23008 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -110,6 +110,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
                 distributable.getDistributable(),
                 dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
         for (ExtendedBlocklet blocklet : blocklets) {
+          blocklet.getDetailInfo();
           blocklet.setDataMapUniqueId(distributable.getUniqueId());
         }
         blockletIterator = blocklets.iterator();
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 85445eb..4797b53 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -62,6 +62,8 @@ public class Segment implements Serializable {
    */
   private LoadMetadataDetails loadMetadataDetails;
 
+  private String segmentString;
+
   public Segment(String segmentNo) {
     this.segmentNo = segmentNo;
   }
@@ -69,6 +71,7 @@ public class Segment implements Serializable {
   public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
     this.segmentNo = segmentNo;
     this.readCommittedScope = readCommittedScope;
+    segmentString = segmentNo;
   }
 
   /**
@@ -82,6 +85,11 @@ public class Segment implements Serializable {
     this.segmentNo = segmentNo;
     this.segmentFileName = segmentFileName;
     this.readCommittedScope = null;
+    if (segmentFileName != null) {
+      segmentString = segmentNo + "#" + segmentFileName;
+    } else {
+      segmentString = segmentNo;
+    }
   }
 
   /**
@@ -94,6 +102,11 @@ public class Segment implements Serializable {
     this.segmentNo = segmentNo;
     this.segmentFileName = segmentFileName;
     this.readCommittedScope = readCommittedScope;
+    if (segmentFileName != null) {
+      segmentString = segmentNo + "#" + segmentFileName;
+    } else {
+      segmentString = segmentNo;
+    }
   }
 
   /**
@@ -107,6 +120,11 @@ public class Segment implements Serializable {
     this.segmentFileName = segmentFileName;
     this.readCommittedScope = readCommittedScope;
     this.loadMetadataDetails = loadMetadataDetails;
+    if (segmentFileName != null) {
+      segmentString = segmentNo + "#" + segmentFileName;
+    } else {
+      segmentString = segmentNo;
+    }
   }
 
   /**
@@ -233,11 +251,7 @@ public class Segment implements Serializable {
   }
 
   @Override public String toString() {
-    if (segmentFileName != null) {
-      return segmentNo + "#" + segmentFileName;
-    } else {
-      return segmentNo;
-    }
+    return segmentString;
   }
 
   public LoadMetadataDetails getLoadMetadataDetails() {
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 15b0e8b..f9020bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -127,7 +127,7 @@ public final class TableDataMap extends OperationEventListener {
       }
       blocklets.addAll(addSegmentId(
           blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-          segment.toString()));
+          segment));
     }
     return blocklets;
   }
@@ -148,15 +148,11 @@ public final class TableDataMap extends OperationEventListener {
     final List<ExtendedBlocklet> blocklets = new ArrayList<>();
     final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
     // for non-filter queries
-    if (filterExp == null) {
-      // if filter is not passed, then return all the blocklets.
-      return pruneWithoutFilter(segments, partitions, blocklets);
-    }
     // for filter queries
     int totalFiles = 0;
     int datamapsCount = 0;
     for (Segment segment : segments) {
-      for (DataMap dataMap : dataMaps.get(segment)) {
+      for (DataMap dataMap: dataMaps.get(segment)) {
         totalFiles += dataMap.getNumberOfEntries();
         datamapsCount++;
       }
@@ -168,11 +164,16 @@ public final class TableDataMap extends OperationEventListener {
       // As 0.1 million files block pruning can take only 1 second.
       // Doing multi-thread for smaller values is not recommended as
       // driver should have minimum threads opened to support multiple concurrent queries.
+      if (filterExp == null) {
+        // if filter is not passed, then return all the blocklets.
+        return pruneWithoutFilter(segments, partitions, blocklets);
+      }
       return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
     }
     // handle by multi-thread
-    return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps,
-        totalFiles);
+    List<ExtendedBlocklet> extendedBlocklets =
+        pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
+    return extendedBlocklets;
   }
 
   private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
@@ -181,7 +182,7 @@ public final class TableDataMap extends OperationEventListener {
       List<Blocklet> allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
       blocklets.addAll(
           addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
-              segment.toString()));
+              segment));
     }
     return blocklets;
   }
@@ -197,12 +198,12 @@ public final class TableDataMap extends OperationEventListener {
       }
       blocklets.addAll(
           addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-              segment.toString()));
+              segment));
     }
     return blocklets;
   }
 
-  private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments,
+  private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
       final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
       List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
       int totalFiles) {
@@ -279,7 +280,8 @@ public final class TableDataMap extends OperationEventListener {
       throw new RuntimeException(" not all the files processed ");
     }
     List<Future<Void>> results = new ArrayList<>(numOfThreadsForPruning);
-    final Map<Segment, List<Blocklet>> prunedBlockletMap = new ConcurrentHashMap<>(segments.size());
+    final Map<Segment, List<ExtendedBlocklet>> prunedBlockletMap =
+        new ConcurrentHashMap<>(segments.size());
     final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
     final String threadName = Thread.currentThread().getName();
     for (int i = 0; i < numOfThreadsForPruning; i++) {
@@ -288,16 +290,22 @@ public final class TableDataMap extends OperationEventListener {
         @Override public Void call() throws IOException {
           Thread.currentThread().setName(threadName);
           for (SegmentDataMapGroup segmentDataMapGroup : segmentDataMapGroups) {
-            List<Blocklet> pruneBlocklets = new ArrayList<>();
+            List<ExtendedBlocklet> pruneBlocklets = new ArrayList<>();
             List<DataMap> dataMapList = dataMaps.get(segmentDataMapGroup.getSegment());
+            SegmentProperties segmentProperties =
+                segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
+            Segment segment = segmentDataMapGroup.getSegment();
             for (int i = segmentDataMapGroup.getFromIndex();
                  i <= segmentDataMapGroup.getToIndex(); i++) {
-              pruneBlocklets.addAll(dataMapList.get(i).prune(filterExp,
-                  segmentPropertiesFetcher.getSegmentProperties(segmentDataMapGroup.getSegment()),
-                  partitions));
+              List<Blocklet> dmPruneBlocklets  = dataMapList.get(i).prune(filterExp,
+                  segmentProperties,
+                  partitions);
+              pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
+                      .getExtendedBlocklets(dmPruneBlocklets, segment),
+                  segment));
             }
             synchronized (prunedBlockletMap) {
-              List<Blocklet> pruneBlockletsExisting =
+              List<ExtendedBlocklet> pruneBlockletsExisting =
                   prunedBlockletMap.get(segmentDataMapGroup.getSegment());
               if (pruneBlockletsExisting != null) {
                 pruneBlockletsExisting.addAll(pruneBlocklets);
@@ -324,14 +332,8 @@ public final class TableDataMap extends OperationEventListener {
         throw new RuntimeException(e);
       }
     }
-    for (Map.Entry<Segment, List<Blocklet>> entry : prunedBlockletMap.entrySet()) {
-      try {
-        blocklets.addAll(addSegmentId(
-            blockletDetailsFetcher.getExtendedBlocklets(entry.getValue(), entry.getKey()),
-            entry.getKey().toString()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    for (Map.Entry<Segment, List<ExtendedBlocklet>> entry : prunedBlockletMap.entrySet()) {
+      blocklets.addAll(entry.getValue());
     }
     return blocklets;
   }
@@ -353,9 +355,9 @@ public final class TableDataMap extends OperationEventListener {
   }
 
   private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
-      String segmentId) {
+      Segment segment) {
     for (ExtendedBlocklet blocklet : pruneBlocklets) {
-      blocklet.setSegmentId(segmentId);
+      blocklet.setSegment(segment);
     }
     return pruneBlocklets;
   }
@@ -425,7 +427,7 @@ public final class TableDataMap extends OperationEventListener {
         detailedBlocklet.setDataMapWriterPath(blockletwritePath);
         serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
       }
-      detailedBlocklet.setSegmentId(distributable.getSegment().toString());
+      detailedBlocklet.setSegment(distributable.getSegment());
       detailedBlocklets.add(detailedBlocklet);
     }
     return detailedBlocklets;
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7dbbe2a..a27023f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -81,19 +81,46 @@ public final class FileFactory {
   }
 
   public static FileType getFileType(String path) {
-    String lowerPath = path.toLowerCase();
-    if (lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+    FileType fileType = getFileTypeWithActualPath(path);
+    if (fileType != null) {
+      return fileType;
+    }
+    fileType = getFileTypeWithLowerCase(path);
+    if (fileType != null) {
+      return fileType;
+    }
+    return FileType.LOCAL;
+  }
+
+  private static FileType getFileTypeWithLowerCase(String path) {
+    String lowerCase = path.toLowerCase();
+    if (lowerCase.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
       return FileType.HDFS;
-    } else if (lowerPath.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+    } else if (lowerCase.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
       return FileType.ALLUXIO;
-    } else if (lowerPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+    } else if (lowerCase.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
       return FileType.VIEWFS;
-    } else if (lowerPath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
-        lowerPath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+    } else if (lowerCase.startsWith(CarbonCommonConstants.S3N_PREFIX) || lowerCase
+        .startsWith(CarbonCommonConstants.S3A_PREFIX) || lowerCase
+        .startsWith(CarbonCommonConstants.S3_PREFIX)) {
       return FileType.S3;
     }
-    return FileType.LOCAL;
+    return null;
+  }
+
+  private static FileType getFileTypeWithActualPath(String path) {
+    if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+      return FileType.HDFS;
+    } else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
+      return FileType.ALLUXIO;
+    } else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+      return FileType.VIEWFS;
+    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX) || path
+        .startsWith(CarbonCommonConstants.S3A_PREFIX) || path
+        .startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      return FileType.S3;
+    }
+    return null;
   }
 
   public static CarbonFile getCarbonFile(String path) {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 22dff8e..8c4ea06 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -16,58 +16,67 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
 /**
  * Detailed blocklet information
  */
 public class ExtendedBlocklet extends Blocklet {
 
-  private String segmentId;
-
-  private BlockletDetailInfo detailInfo;
-
-  private long length;
-
-  private String[] location;
-
-  private String dataMapWriterPath;
-
   private String dataMapUniqueId;
 
-  public ExtendedBlocklet(String filePath, String blockletId) {
-    super(filePath, blockletId);
-  }
+  private CarbonInputSplit inputSplit;
 
   public ExtendedBlocklet(String filePath, String blockletId,
-      boolean compareBlockletIdForObjectMatching) {
+      boolean compareBlockletIdForObjectMatching, ColumnarFormatVersion version) {
     super(filePath, blockletId, compareBlockletIdForObjectMatching);
+    try {
+      this.inputSplit = CarbonInputSplit.from(null, blockletId, filePath, 0, 0, version, null);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
-  public BlockletDetailInfo getDetailInfo() {
-    return detailInfo;
+  public ExtendedBlocklet(String filePath, String blockletId, ColumnarFormatVersion version) {
+    this(filePath, blockletId, true, version);
   }
 
-  public void setDetailInfo(BlockletDetailInfo detailInfo) {
-    this.detailInfo = detailInfo;
+  public BlockletDetailInfo getDetailInfo() {
+    return this.inputSplit.getDetailInfo();
   }
 
-  public void setLocation(String[] location) {
-    this.location = location;
+  public void setDataMapRow(DataMapRow dataMapRow) {
+    this.inputSplit.setDataMapRow(dataMapRow);
   }
 
   public String[] getLocations() {
-    return location;
+    try {
+      return this.inputSplit.getLocations();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public long getLength() {
-    return length;
+    return this.inputSplit.getLength();
   }
 
   public String getSegmentId() {
-    return segmentId;
+    return this.inputSplit.getSegmentId();
   }
 
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
+  public Segment getSegment() {
+    return this.inputSplit.getSegment();
+  }
+  public void setSegment(Segment segment) {
+    this.inputSplit.setSegment(segment);
   }
 
   public String getPath() {
@@ -75,11 +84,11 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getDataMapWriterPath() {
-    return dataMapWriterPath;
+    return this.inputSplit.getDataMapWritePath();
   }
 
   public void setDataMapWriterPath(String dataMapWriterPath) {
-    this.dataMapWriterPath = dataMapWriterPath;
+    this.inputSplit.setDataMapWritePath(dataMapWriterPath);
   }
 
   public String getDataMapUniqueId() {
@@ -98,13 +107,41 @@ public class ExtendedBlocklet extends Blocklet {
     }
 
     ExtendedBlocklet that = (ExtendedBlocklet) o;
-
-    return segmentId != null ? segmentId.equals(that.segmentId) : that.segmentId == null;
+    return inputSplit.getSegmentId() != null ?
+        inputSplit.getSegmentId().equals(that.inputSplit.getSegmentId()) :
+        that.inputSplit.getSegmentId() == null;
   }
 
   @Override public int hashCode() {
     int result = super.hashCode();
-    result = 31 * result + (segmentId != null ? segmentId.hashCode() : 0);
+    result = 31 * result + (inputSplit.getSegmentId() != null ?
+        inputSplit.getSegmentId().hashCode() :
+        0);
     return result;
   }
+
+  public CarbonInputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  public void setColumnCardinality(int[] cardinality) {
+    inputSplit.setColumnCardinality(cardinality);
+  }
+
+  public void setLegacyStore(boolean isLegacyStore) {
+    inputSplit.setLegacyStore(isLegacyStore);
+  }
+
+  public void setUseMinMaxForPruning(boolean useMinMaxForPruning) {
+    this.inputSplit.setUseMinMaxForPruning(useMinMaxForPruning);
+  }
+
+  public void setIsBlockCache(boolean isBlockCache) {
+    this.inputSplit.setIsBlockCache(isBlockCache);
+  }
+
+  public void setColumnSchema(List<ColumnSchema> columnSchema) {
+    this.inputSplit.setColumnSchema(columnSchema);
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index b7fb98c..03f8a1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.indexstore;
 import java.io.IOException;
 
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 
 /**
@@ -35,4 +36,6 @@ public interface SegmentPropertiesFetcher {
    */
   SegmentProperties getSegmentProperties(Segment segment)
       throws IOException;
+
+  SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap) throws IOException;
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index 3226ceb..9f6a76e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -37,12 +37,15 @@ public class TableBlockIndexUniqueIdentifier implements Serializable {
 
   private String segmentId;
 
+  private String uniqueName;
+
   public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName,
       String mergeIndexFileName, String segmentId) {
     this.indexFilePath = indexFilePath;
     this.indexFileName = indexFileName;
     this.mergeIndexFileName = mergeIndexFileName;
     this.segmentId = segmentId;
+    this.uniqueName = indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
   }
 
   /**
@@ -51,7 +54,7 @@ public class TableBlockIndexUniqueIdentifier implements Serializable {
    * @return
    */
   public String getUniqueTableSegmentIdentifier() {
-    return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
+    return this.uniqueName;
   }
 
   public String getIndexFilePath() {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 0db1b0a..8185c25 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.indexstore;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
@@ -52,7 +53,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
     this.allocatedSize = capacity;
     this.memoryBlock =
         UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize);
-    this.pointers = new int[1000];
+    this.pointers = new int[100];
   }
 
   /**
@@ -66,7 +67,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
       increaseMemory(runningLength + rowSize);
     }
     if (this.pointers.length <= rowCount + 1) {
-      int[] newPointer = new int[pointers.length + 1000];
+      int[] newPointer = new int[pointers.length + 100];
       System.arraycopy(pointers, 0, newPointer, 0, pointers.length);
       this.pointers = newPointer;
     }
@@ -84,9 +85,33 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
   /**
    * Add the index row to unsafe.
+   * Below format is used to store data in memory block
+   * WRITE:
+   * <FD><FD><FD><VO><VO><VO><LO><VD><VD><VD>
+   * FD: Fixed Column data
+   * VO: Variable column data offset
+   * VD: Variable column data
+   * LO: Last Offset
+   *
+   * Read:
+   * FD: Read directly based of byte postion added in CarbonRowSchema
+   *
+   * VD: Read based on below logic
+   * if not last variable column schema
+   * X = read actual variable column offset based on byte postion added in CarbonRowSchema
+   * Y = read next variable column offset (next 4 bytes)
+   * get the length
+   * len  = (X-Y)
+   * read data from offset X of size len
+   *
+   * if last variable column
+   * X = read actual variable column offset based on byte postion added in CarbonRowSchema
+   * Y = read last offset (next 4 bytes)
+   * get the length
+   * len  = (X-Y)
+   * read data from offset X of size len
    *
    * @param indexRow
-   * @return
    */
   public void addIndexRow(CarbonRowSchema[] schema, DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
@@ -94,88 +119,122 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
     // Check whether allocated memory is sufficient or not.
     ensureSize(rowSize);
     int pointer = runningLength;
-
+    int bytePosition = 0;
     for (int i = 0; i < schema.length; i++) {
-      addToUnsafe(schema[i], indexRow, i);
+      switch (schema[i].getSchemaType()) {
+        case STRUCT:
+          CarbonRowSchema[] childSchemas =
+              ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas();
+          for (int j = 0; j < childSchemas.length; j++) {
+            if (childSchemas[j].getBytePosition() > bytePosition) {
+              bytePosition = childSchemas[j].getBytePosition();
+            }
+          }
+          break;
+        default:
+          if (schema[i].getBytePosition() > bytePosition) {
+            bytePosition = schema[i].getBytePosition();
+          }
+      }
     }
+    // byte position of Last offset
+    bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    // start byte position of variable length data
+    int varColPosition = bytePosition + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    // current position refers to current byte postion in memory block
+    int currentPosition;
+    for (int i = 0; i < schema.length; i++) {
+      switch (schema[i].getSchemaType()) {
+        case STRUCT:
+          CarbonRowSchema[] childSchemas =
+              ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas();
+          DataMapRow row = indexRow.getRow(i);
+          for (int j = 0; j < childSchemas.length; j++) {
+            currentPosition = addToUnsafe(childSchemas[j], row, j, pointer, varColPosition);
+            if (currentPosition > 0) {
+              varColPosition = currentPosition;
+            }
+          }
+          break;
+        default:
+          currentPosition = addToUnsafe(schema[i], indexRow, i, pointer, varColPosition);
+          if (currentPosition > 0) {
+            varColPosition = currentPosition;
+          }
+          break;
+      }
+    }
+    // writting the last offset
+    getUnsafe()
+        .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + pointer + bytePosition,
+            varColPosition);
+    // after adding last offset increament the length by 4 bytes as last postion
+    // written as INT
+    runningLength += CarbonCommonConstants.INT_SIZE_IN_BYTE;
     pointers[rowCount++] = pointer;
   }
 
-  private void addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index) {
+  private int addToUnsafe(CarbonRowSchema schema, DataMapRow row, int index, int startOffset,
+      int varPosition) {
     switch (schema.getSchemaType()) {
       case FIXED:
         DataType dataType = schema.getDataType();
         if (dataType == DataTypes.BYTE) {
-          getUnsafe()
-              .putByte(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getByte(index));
+          getUnsafe().putByte(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getByte(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.BOOLEAN) {
-          getUnsafe()
-              .putBoolean(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getBoolean(index));
+          getUnsafe().putBoolean(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getBoolean(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.SHORT) {
-          getUnsafe()
-              .putShort(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getShort(index));
+          getUnsafe().putShort(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getShort(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.INT) {
-          getUnsafe()
-              .putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getInt(index));
+          getUnsafe().putInt(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getInt(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.LONG) {
-          getUnsafe()
-              .putLong(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getLong(index));
+          getUnsafe().putLong(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getLong(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.FLOAT) {
-          getUnsafe()
-              .putFloat(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getFloat(index));
+          getUnsafe().putFloat(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getFloat(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.DOUBLE) {
-          getUnsafe()
-              .putDouble(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + runningLength,
-                  row.getDouble(index));
+          getUnsafe().putDouble(memoryBlock.getBaseObject(),
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(),
+              row.getDouble(index));
           runningLength += row.getSizeInBytes(index);
         } else if (dataType == DataTypes.BYTE_ARRAY) {
           byte[] data = row.getByteArray(index);
           getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
-              memoryBlock.getBaseOffset() + runningLength, data.length);
+              memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), data.length);
           runningLength += row.getSizeInBytes(index);
         } else {
           throw new UnsupportedOperationException(
               "unsupported data type for unsafe storage: " + schema.getDataType());
         }
-        break;
+        return 0;
       case VARIABLE_SHORT:
-        byte[] data = row.getByteArray(index);
-        getUnsafe().putShort(memoryBlock.getBaseObject(),
-            memoryBlock.getBaseOffset() + runningLength, (short) data.length);
-        runningLength += 2;
-        getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
-            memoryBlock.getBaseOffset() + runningLength, data.length);
-        runningLength += data.length;
-        break;
       case VARIABLE_INT:
-        byte[] data2 = row.getByteArray(index);
+        byte[] data = row.getByteArray(index);
         getUnsafe().putInt(memoryBlock.getBaseObject(),
-            memoryBlock.getBaseOffset() + runningLength, data2.length);
+            memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition);
         runningLength += 4;
-        getUnsafe().copyMemory(data2, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
-            memoryBlock.getBaseOffset() + runningLength, data2.length);
-        runningLength += data2.length;
-        break;
-      case STRUCT:
-        CarbonRowSchema[] childSchemas =
-            ((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas();
-        DataMapRow struct = row.getRow(index);
-        for (int i = 0; i < childSchemas.length; i++) {
-          addToUnsafe(childSchemas[i], struct, i);
-        }
-        break;
+        getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+            memoryBlock.getBaseOffset() + startOffset + varPosition, data.length);
+        runningLength += data.length;
+        varPosition += data.length;
+        return varPosition;
       default:
         throw new UnsupportedOperationException(
             "unsupported data type for unsafe storage: " + schema.getDataType());
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 8ebd50d..4b32688 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -39,7 +39,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
@@ -48,6 +47,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -131,7 +131,7 @@ public class BlockDataMap extends CoarseGrainDataMap
       filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
       isFilePathStored = true;
     }
-    byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+    byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     byte[] segmentId =
         blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     if (!indexInfo.isEmpty()) {
@@ -711,13 +711,16 @@ public class BlockDataMap extends CoarseGrainDataMap
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
     String filePath = getFilePath();
     int numEntries = memoryDMStore.getRowCount();
-    int totalBlocklets = getTotalBlocklets();
+    int totalBlocklets = 0;
+    if (ExplainCollector.enabled()) {
+      totalBlocklets = getTotalBlocklets();
+    }
     int hitBlocklets = 0;
     if (filterExp == null) {
       for (int i = 0; i < numEntries; i++) {
-        DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, i).convertToSafeRow();
-        blocklets.add(createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath),
-            getBlockletId(safeRow), false));
+        DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
+        blocklets.add(createBlocklet(dataMapRow, getFileNameWithFilePath(dataMapRow, filePath),
+            getBlockletId(dataMapRow), false));
       }
       hitBlocklets = totalBlocklets;
     } else {
@@ -730,28 +733,31 @@ public class BlockDataMap extends CoarseGrainDataMap
       boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp);
       // min and max for executor pruning
       while (entryIndex < numEntries) {
-        DataMapRow safeRow = memoryDMStore.getDataMapRow(schema, entryIndex).convertToSafeRow();
-        boolean[] minMaxFlag = getMinMaxFlag(safeRow, BLOCK_MIN_MAX_FLAG);
-        String fileName = getFileNameWithFilePath(safeRow, filePath);
-        short blockletId = getBlockletId(safeRow);
+        DataMapRow row = memoryDMStore.getDataMapRow(schema, entryIndex);
+        boolean[] minMaxFlag = getMinMaxFlag(row, BLOCK_MIN_MAX_FLAG);
+        String fileName = getFileNameWithFilePath(row, filePath);
+        short blockletId = getBlockletId(row);
         boolean isValid =
-            addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX),
-                getMinMaxValue(safeRow, MIN_VALUES_INDEX), minMaxFlag, fileName, blockletId);
+            addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(row, MAX_VALUES_INDEX),
+                getMinMaxValue(row, MIN_VALUES_INDEX), minMaxFlag, fileName, blockletId);
         if (isValid) {
-          blocklets.add(createBlocklet(safeRow, fileName, blockletId, useMinMaxForPruning));
-          hitBlocklets += getBlockletNumOfEntry(entryIndex);
+          blocklets.add(createBlocklet(row, fileName, blockletId, useMinMaxForPruning));
+          if (ExplainCollector.enabled()) {
+            hitBlocklets += getBlockletNumOfEntry(entryIndex);
+          }
         }
         entryIndex++;
       }
     }
-
-    if (isLegacyStore) {
-      ExplainCollector.setShowPruningInfo(false);
-    } else {
-      ExplainCollector.setShowPruningInfo(true);
-      ExplainCollector.addTotalBlocklets(totalBlocklets);
-      ExplainCollector.addTotalBlocks(getTotalBlocks());
-      ExplainCollector.addDefaultDataMapPruningHit(hitBlocklets);
+    if (ExplainCollector.enabled()) {
+      if (isLegacyStore) {
+        ExplainCollector.setShowPruningInfo(false);
+      } else {
+        ExplainCollector.setShowPruningInfo(true);
+        ExplainCollector.addTotalBlocklets(totalBlocklets);
+        ExplainCollector.addTotalBlocks(getTotalBlocks());
+        ExplainCollector.addDefaultDataMapPruningHit(hitBlocklets);
+      }
     }
     return blocklets;
   }
@@ -907,10 +913,10 @@ public class BlockDataMap extends CoarseGrainDataMap
         rowIndex++;
       }
     }
-    DataMapRow safeRow =
-        memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex).convertToSafeRow();
+    DataMapRow row =
+        memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), rowIndex);
     String filePath = getFilePath();
-    return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath), relativeBlockletId,
+    return createBlocklet(row, getFileNameWithFilePath(row, filePath), relativeBlockletId,
         false);
   }
 
@@ -961,34 +967,16 @@ public class BlockDataMap extends CoarseGrainDataMap
 
   protected ExtendedBlocklet createBlocklet(DataMapRow row, String fileName, short blockletId,
       boolean useMinMaxForPruning) {
-    ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", false);
-    BlockletDetailInfo detailInfo = getBlockletDetailInfo(row, blockletId, blocklet);
-    detailInfo.setBlockletInfoBinary(new byte[0]);
-    blocklet.setDetailInfo(detailInfo);
+    short versionNumber = row.getShort(VERSION_INDEX);
+    ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "", false,
+        ColumnarFormatVersion.valueOf(versionNumber));
+    blocklet.setDataMapRow(row);
+    blocklet.setColumnCardinality(getColumnCardinality());
+    blocklet.setLegacyStore(isLegacyStore);
+    blocklet.setUseMinMaxForPruning(useMinMaxForPruning);
     return blocklet;
   }
 
-  protected BlockletDetailInfo getBlockletDetailInfo(DataMapRow row, short blockletId,
-      ExtendedBlocklet blocklet) {
-    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
-    detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
-    detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
-    detailInfo.setBlockletId(blockletId);
-    detailInfo.setDimLens(getColumnCardinality());
-    detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    try {
-      blocklet.setLocation(
-          new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
-              .split(","));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
-    detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
-    detailInfo.setLegacyStore(isLegacyStore);
-    return detailInfo;
-  }
-
   private String[] getFileDetails() {
     try {
       String[] fileDetails = new String[3];
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 7939a17..23d39ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -30,12 +30,12 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -232,11 +232,10 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
       return super.getDetailedBlocklet(blockletId);
     }
     int absoluteBlockletId = Integer.parseInt(blockletId);
-    DataMapRow safeRow = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId)
-        .convertToSafeRow();
-    short relativeBlockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
+    DataMapRow row = memoryDMStore.getDataMapRow(getFileFooterEntrySchema(), absoluteBlockletId);
+    short relativeBlockletId = row.getShort(BLOCKLET_ID_INDEX);
     String filePath = getFilePath();
-    return createBlocklet(safeRow, getFileNameWithFilePath(safeRow, filePath), relativeBlockletId,
+    return createBlocklet(row, getFileNameWithFilePath(row, filePath), relativeBlockletId,
         false);
   }
 
@@ -262,13 +261,15 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
     if (isLegacyStore) {
       return super.createBlocklet(row, fileName, blockletId, useMinMaxForPruning);
     }
-    ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "");
-    BlockletDetailInfo detailInfo = getBlockletDetailInfo(row, blockletId, blocklet);
-    detailInfo.setColumnSchemas(getColumnSchema());
-    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCKLET_INFO_INDEX));
-    detailInfo.setPagesCount(row.getShort(BLOCKLET_PAGE_COUNT_INDEX));
-    detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
-    blocklet.setDetailInfo(detailInfo);
+    short versionNumber = row.getShort(VERSION_INDEX);
+    ExtendedBlocklet blocklet = new ExtendedBlocklet(fileName, blockletId + "",
+        ColumnarFormatVersion.valueOf(versionNumber));
+    blocklet.setColumnSchema(getColumnSchema());
+    blocklet.setUseMinMaxForPruning(useMinMaxForPruning);
+    blocklet.setIsBlockCache(false);
+    blocklet.setColumnCardinality(getColumnCardinality());
+    blocklet.setLegacyStore(isLegacyStore);
+    blocklet.setDataMapRow(row);
     return blocklet;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 5892f78..2ef7b88 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -192,7 +192,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   @Override
   public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException {
-    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
+    List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>(blocklets.size() + 1);
     // If it is already detailed blocklet then type cast and return same
     if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
       for (Blocklet blocklet : blocklets) {
@@ -379,6 +379,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return dataMap.getSegmentProperties();
   }
 
+  @Override public SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap)
+      throws IOException {
+    assert (coarseGrainDataMap instanceof BlockDataMap);
+    BlockDataMap dataMap = (BlockDataMap) coarseGrainDataMap;
+    return dataMap.getSegmentProperties();
+  }
+
   @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
       throws IOException {
     List<Blocklet> blocklets = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index c0ea0a0..18adc06 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -78,6 +78,8 @@ public abstract class DataMapRow implements Serializable {
     for (int i = 0; i < schemas.length; i++) {
       len += getSizeInBytes(i);
     }
+    // for last offset in unsafe data map row
+    len += 4;
     return len;
   }
 
@@ -86,7 +88,6 @@ public abstract class DataMapRow implements Serializable {
       case FIXED:
         return schemas[ordinal].getLength();
       case VARIABLE_SHORT:
-        return getLengthInBytes(ordinal) + 2;
       case VARIABLE_INT:
         return getLengthInBytes(ordinal) + 4;
       case STRUCT:
@@ -105,15 +106,6 @@ public abstract class DataMapRow implements Serializable {
     return schemas.length;
   }
 
-  /**
-   * default implementation
-   *
-   * @return
-   */
-  public DataMapRow convertToSafeRow() {
-    return this;
-  }
-
   public void setSchemas(CarbonRowSchema[] schemas) {
     if (null == this.schemas) {
       this.schemas = schemas;
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 70f0e0d..5f6c4dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.core.indexstore.row;
 
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -47,38 +45,39 @@ public class UnsafeDataMapRow extends DataMapRow {
 
   @Override public byte[] getByteArray(int ordinal) {
     int length;
-    int position = getPosition(ordinal);
+    int currentOffset;
     switch (schemas[ordinal].getSchemaType()) {
       case VARIABLE_SHORT:
-        length = getUnsafe().getShort(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
-        position += 2;
-        break;
       case VARIABLE_INT:
-        length = getUnsafe().getInt(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
-        position += 4;
+        final int schemaOrdinal = schemas[ordinal].getBytePosition();
+        currentOffset = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + schemaOrdinal);
+        int nextOffset = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + schemaOrdinal + 4);
+        length = nextOffset - currentOffset;
         break;
       default:
+        currentOffset = schemas[ordinal].getBytePosition();
         length = schemas[ordinal].getLength();
     }
     byte[] data = new byte[length];
-    getUnsafe().copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
-        BYTE_ARRAY_OFFSET, data.length);
+    getUnsafe()
+        .copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + currentOffset, data,
+            BYTE_ARRAY_OFFSET, data.length);
     return data;
   }
 
   @Override public int getLengthInBytes(int ordinal) {
     int length;
-    int position = getPosition(ordinal);
+    int schemaOrdinal = schemas[ordinal].getBytePosition();
     switch (schemas[ordinal].getSchemaType()) {
       case VARIABLE_SHORT:
-        length = getUnsafe().getShort(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
-        break;
       case VARIABLE_INT:
-        length = getUnsafe().getInt(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
+        int currentOffset = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + schemaOrdinal);
+        int nextOffset = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + schemaOrdinal + 4);
+        length = nextOffset - currentOffset;
         break;
       default:
         length = schemas[ordinal].getLength();
@@ -91,31 +90,14 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public boolean getBoolean(int ordinal) {
-    return getUnsafe()
-        .getBoolean(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
-  }
-
-  private int getLengthInBytes(int ordinal, int position) {
-    int length;
-    switch (schemas[ordinal].getSchemaType()) {
-      case VARIABLE_SHORT:
-        length = getUnsafe().getShort(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
-        break;
-      case VARIABLE_INT:
-        length = getUnsafe().getInt(block.getBaseObject(),
-            block.getBaseOffset() + pointer + position);
-        break;
-      default:
-        length = schemas[ordinal].getLength();
-    }
-    return length;
+    return getUnsafe().getBoolean(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public DataMapRow getRow(int ordinal) {
     CarbonRowSchema[] childSchemas =
         ((CarbonRowSchema.StructCarbonRowSchema) schemas[ordinal]).getChildSchemas();
-    return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+    return new UnsafeDataMapRow(childSchemas, block, pointer);
   }
 
   @Override public void setByteArray(byte[] byteArray, int ordinal) {
@@ -123,8 +105,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public int getInt(int ordinal) {
-    return getUnsafe()
-        .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getInt(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setInt(int value, int ordinal) {
@@ -136,8 +118,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public byte getByte(int ordinal) {
-    return getUnsafe()
-        .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getByte(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setShort(short value, int ordinal) {
@@ -145,8 +127,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public short getShort(int ordinal) {
-    return getUnsafe()
-        .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getShort(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setLong(long value, int ordinal) {
@@ -154,8 +136,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public long getLong(int ordinal) {
-    return getUnsafe()
-        .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getLong(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setFloat(float value, int ordinal) {
@@ -163,8 +145,8 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public float getFloat(int ordinal) {
-    return getUnsafe()
-        .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getFloat(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setDouble(double value, int ordinal) {
@@ -172,146 +154,11 @@ public class UnsafeDataMapRow extends DataMapRow {
   }
 
   @Override public double getDouble(int ordinal) {
-    return getUnsafe()
-        .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+    return getUnsafe().getDouble(block.getBaseObject(),
+        block.getBaseOffset() + pointer + schemas[ordinal].getBytePosition());
   }
 
   @Override public void setRow(DataMapRow row, int ordinal) {
     throw new UnsupportedOperationException("Not supported to set on unsafe row");
   }
-
-  /**
-   * Convert unsafe to safe row.
-   *
-   * @return
-   */
-  public DataMapRow convertToSafeRow() {
-    DataMapRowImpl row = new DataMapRowImpl(schemas);
-    int runningLength = 0;
-    for (int i = 0; i < schemas.length; i++) {
-      CarbonRowSchema schema = schemas[i];
-      switch (schema.getSchemaType()) {
-        case FIXED:
-          DataType dataType = schema.getDataType();
-          if (dataType == DataTypes.BYTE) {
-            row.setByte(
-                getUnsafe().getByte(
-                    block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.BOOLEAN) {
-            row.setBoolean(
-                getUnsafe().getBoolean(
-                    block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.SHORT) {
-            row.setShort(
-                getUnsafe().getShort(
-                    block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.INT) {
-            row.setInt(
-                getUnsafe().getInt(
-                    block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.LONG) {
-            row.setLong(
-                getUnsafe().getLong(
-                    block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.FLOAT) {
-            row.setFloat(
-                getUnsafe().getFloat(block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.DOUBLE) {
-            row.setDouble(
-                getUnsafe().getDouble(block.getBaseObject(),
-                    block.getBaseOffset() + pointer + runningLength),
-                i);
-            runningLength += schema.getLength();
-          } else if (dataType == DataTypes.BYTE_ARRAY) {
-            byte[] data = new byte[schema.getLength()];
-            getUnsafe().copyMemory(
-                block.getBaseObject(),
-                block.getBaseOffset() + pointer + runningLength,
-                data,
-                BYTE_ARRAY_OFFSET,
-                data.length);
-            row.setByteArray(data, i);
-            runningLength += data.length;
-          } else {
-            throw new UnsupportedOperationException(
-                "unsupported data type for unsafe storage: " + schema.getDataType());
-          }
-          break;
-        case VARIABLE_SHORT:
-          int length = getUnsafe()
-              .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
-          runningLength += 2;
-          byte[] data = new byte[length];
-          getUnsafe().copyMemory(block.getBaseObject(),
-              block.getBaseOffset() + pointer + runningLength,
-              data, BYTE_ARRAY_OFFSET, data.length);
-          runningLength += data.length;
-          row.setByteArray(data, i);
-          break;
-        case VARIABLE_INT:
-          int length2 = getUnsafe()
-              .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
-          runningLength += 4;
-          byte[] data2 = new byte[length2];
-          getUnsafe().copyMemory(block.getBaseObject(),
-              block.getBaseOffset() + pointer + runningLength,
-              data2, BYTE_ARRAY_OFFSET, data2.length);
-          runningLength += data2.length;
-          row.setByteArray(data2, i);
-          break;
-        case STRUCT:
-          DataMapRow structRow = ((UnsafeDataMapRow) getRow(i)).convertToSafeRow();
-          row.setRow(structRow, i);
-          runningLength += structRow.getTotalSizeInBytes();
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "unsupported data type for unsafe storage: " + schema.getDataType());
-      }
-    }
-    row.setTotalLengthInBytes(runningLength);
-
-    return row;
-  }
-
-  private int getSizeInBytes(int ordinal, int position) {
-    switch (schemas[ordinal].getSchemaType()) {
-      case FIXED:
-        return schemas[ordinal].getLength();
-      case VARIABLE_SHORT:
-        return getLengthInBytes(ordinal, position) + 2;
-      case VARIABLE_INT:
-        return getLengthInBytes(ordinal, position) + 4;
-      case STRUCT:
-        return getRow(ordinal).getTotalSizeInBytes();
-      default:
-        throw new UnsupportedOperationException("wrong type");
-    }
-  }
-
-  private int getPosition(int ordinal) {
-    int position = 0;
-    for (int i = 0; i < ordinal; i++) {
-      position += getSizeInBytes(i, position);
-    }
-    return position;
-  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 7f47c00..30a7a9c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -28,6 +28,7 @@ public abstract class CarbonRowSchema implements Serializable {
   private static final long serialVersionUID = -8061282029097686495L;
 
   protected DataType dataType;
+  private int bytePosition = -1;
 
   public CarbonRowSchema(DataType dataType) {
     this.dataType = dataType;
@@ -55,6 +56,13 @@ public abstract class CarbonRowSchema implements Serializable {
     return dataType.getSizeInBytes();
   }
 
+  public void setBytePosition(int bytePosition) {
+    this.bytePosition = bytePosition;
+  }
+
+  public int getBytePosition() {
+    return this.bytePosition;
+  }
   /**
    * schema type
    * @return
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index 52b9fb3..41c382b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.indexstore.schema;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -60,10 +61,77 @@ public class SchemaGenerator {
     // written in the metadata or not.
     addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
     CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    updateBytePosition(schema);
     return schema;
   }
 
   /**
+   * Method to update the byte position which will be used in case of unsafe dm store
+   * @see org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java:87
+   *
+   * @param schema
+   */
+  private static void updateBytePosition(CarbonRowSchema[] schema) {
+    int currentSize;
+    int bytePosition = 0;
+    // First assign byte postion to all the fixed length schema
+    for (int i = 0; i < schema.length; i++) {
+      switch (schema[i].getSchemaType()) {
+        case STRUCT:
+          CarbonRowSchema[] childSchemas =
+              ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas();
+          for (int j = 0; j < childSchemas.length; j++) {
+            currentSize = getSchemaSize(childSchemas[j]);
+            if (currentSize != -1) {
+              childSchemas[j].setBytePosition(bytePosition);
+              bytePosition += currentSize;
+            }
+          }
+          break;
+        default:
+          currentSize = getSchemaSize(schema[i]);
+          if (currentSize != -1) {
+            schema[i].setBytePosition(bytePosition);
+            bytePosition += currentSize;
+          }
+          break;
+      }
+    }
+    // adding byte position for storing offset in case of variable length columns
+    for (int i = 0; i < schema.length; i++) {
+      switch (schema[i].getSchemaType()) {
+        case STRUCT:
+          CarbonRowSchema[] childSchemas =
+              ((CarbonRowSchema.StructCarbonRowSchema) schema[i]).getChildSchemas();
+          for (int j = 0; j < childSchemas.length; j++) {
+            if (childSchemas[j].getBytePosition() == -1) {
+              childSchemas[j].setBytePosition(bytePosition);
+              bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+            }
+          }
+          break;
+        default:
+          if (schema[i].getBytePosition() == -1) {
+            schema[i].setBytePosition(bytePosition);
+            bytePosition += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+          }
+          break;
+      }
+    }
+  }
+  private static int getSchemaSize(CarbonRowSchema schema) {
+    switch (schema.getSchemaType()) {
+      case FIXED:
+        return schema.getLength();
+      case VARIABLE_SHORT:
+      case VARIABLE_INT:
+        return -1;
+      default:
+        throw new UnsupportedOperationException("Invalid Type");
+    }
+  }
+
+  /**
    * Method for creating blocklet Schema. Each blocklet row will share the same schema
    *
    * @param segmentProperties
@@ -98,6 +166,7 @@ public class SchemaGenerator {
     // for relative blocklet id i.e. blocklet id that belongs to a particular part file
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
     CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    updateBytePosition(schema);
     return schema;
   }
 
@@ -140,6 +209,7 @@ public class SchemaGenerator {
     }
     CarbonRowSchema[] schema =
         taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+    updateBytePosition(schema);
     return schema;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d6017f5..267527f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -18,19 +18,16 @@
 package org.apache.carbondata.core.scan.model;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.UnknownExpression;
@@ -92,15 +89,6 @@ public class QueryModel {
 
   private DataTypeConverter converter;
 
-  /**
-   * Invalid table blocks, which need to be removed from
-   * memory, invalid blocks can be segment which are deleted
-   * or compacted
-   */
-  private List<String> invalidSegmentIds;
-  private Map<String, UpdateVO> invalidSegmentBlockIdMap =
-      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
   private boolean[] isFilterDimensions;
   private boolean[] isFilterMeasures;
 
@@ -135,7 +123,6 @@ public class QueryModel {
 
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
-    invalidSegmentIds = new ArrayList<>();
     this.table = carbonTable;
     this.queryId = String.valueOf(System.nanoTime());
   }
@@ -350,14 +337,6 @@ public class QueryModel {
     this.statisticsRecorder = statisticsRecorder;
   }
 
-  public List<String> getInvalidSegmentIds() {
-    return invalidSegmentIds;
-  }
-
-  public void setInvalidSegmentIds(List<String> invalidSegmentIds) {
-    this.invalidSegmentIds = invalidSegmentIds;
-  }
-
   public boolean isVectorReader() {
     return vectorReader;
   }
@@ -365,15 +344,6 @@ public class QueryModel {
   public void setVectorReader(boolean vectorReader) {
     this.vectorReader = vectorReader;
   }
-  public void setInvalidBlockForSegmentId(List<UpdateVO> invalidSegmentTimestampList) {
-    for (UpdateVO anUpdateVO : invalidSegmentTimestampList) {
-      this.invalidSegmentBlockIdMap.put(anUpdateVO.getSegmentId(), anUpdateVO);
-    }
-  }
-
-  public Map<String,UpdateVO>  getInvalidBlockVOForSegmentId() {
-    return  invalidSegmentBlockIdMap;
-  }
 
   public DataTypeConverter getConverter() {
     return converter;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
similarity index 57%
rename from hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
rename to core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index bcf703c..bb1742c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -33,11 +33,13 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapRowIndexes;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -45,6 +47,7 @@ import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 /**
@@ -61,10 +64,6 @@ public class CarbonInputSplit extends FileSplit
   private String bucketId;
 
   private String blockletId;
-  /*
-   * Invalid segments that need to be removed in task side index
-   */
-  private List<String> invalidSegments;
 
   /*
    * Number of BlockLets in a block
@@ -74,14 +73,6 @@ public class CarbonInputSplit extends FileSplit
   private ColumnarFormatVersion version;
 
   /**
-   * map of blocklocation and storage id
-   */
-  private Map<String, String> blockStorageIdMap =
-      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private List<UpdateVO> invalidTimestampsList;
-
-  /**
    * list of delete delta files for split
    */
   private String[] deleteDeltaFiles;
@@ -97,90 +88,115 @@ public class CarbonInputSplit extends FileSplit
    */
   private Set<Integer> validBlockletIds;
 
+  private transient DataMapRow dataMapRow;
+
+  private transient int[] columnCardinality;
+
+  private transient boolean isLegacyStore;
+
+  private transient List<ColumnSchema> columnSchema;
+
+  private transient boolean useMinMaxForPruning;
+
+  private boolean isBlockCache = true;
+
+  private String filePath;
+
+  private long start;
+
+  private long length;
+
+  private String[] location;
+
+  private transient SplitLocationInfo[] hostInfos;
+
+  private transient Path path;
+
+  private transient String blockPath;
+
   public CarbonInputSplit() {
     segment = null;
     taskId = "0";
     bucketId = "0";
     blockletId = "0";
     numberOfBlocklets = 0;
-    invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
-  private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
-      String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles,
+  private CarbonInputSplit(String segmentId, String blockletId, String filePath, long start,
+      long length, ColumnarFormatVersion version, String[] deleteDeltaFiles,
       String dataMapWritePath) {
-    super(path, start, length, locations);
-    this.segment = Segment.toSegment(segmentId);
-    String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    this.filePath = filePath;
+    this.start = start;
+    this.length = length;
+    if (null != segmentId) {
+      this.segment = Segment.toSegment(segmentId);
+    }
+    String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(this.filePath);
     if (taskNo.contains("_")) {
       taskNo = taskNo.split("_")[0];
     }
     this.taskId = taskNo;
-    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(this.filePath);
     this.blockletId = blockletId;
-    this.invalidSegments = new ArrayList<>();
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
     this.dataMapWritePath = dataMapWritePath;
   }
 
-  public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
-      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+  public CarbonInputSplit(String segmentId, String blockletId, String filePath, long start,
+      long length, String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
       String[] deleteDeltaFiles) {
-    this(segmentId, blockletId, path, start, length, locations, version, deleteDeltaFiles, null);
+    this(segmentId, blockletId, filePath, start, length, version, deleteDeltaFiles, null);
+    this.location = locations;
     this.numberOfBlocklets = numberOfBlocklets;
   }
-
-  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      FileFormat fileFormat) {
-    super(path, start, length, locations);
+  public CarbonInputSplit(String segmentId, String filePath, long start, long length,
+      String[] locations, FileFormat fileFormat) {
+    this.filePath = filePath;
+    this.start = start;
+    this.length = length;
+    this.location = locations;
     this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
     blockletId = "0";
     numberOfBlocklets = 0;
-    invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
-  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      String[] inMemoryHosts, FileFormat fileFormat) {
-    super(path, start, length, locations, inMemoryHosts);
+  public CarbonInputSplit(String segmentId, String filePath, long start, long length,
+      String[] locations, String[] inMemoryHosts, FileFormat fileFormat) {
+    this.filePath = filePath;
+    this.start = start;
+    this.length = length;
+    this.location = locations;
+    this.hostInfos = new SplitLocationInfo[inMemoryHosts.length];
+    for (int i = 0; i < inMemoryHosts.length; i++) {
+      // because N will be tiny, scanning is probably faster than a HashSet
+      boolean inMemory = false;
+      for (String inMemoryHost : inMemoryHosts) {
+        if (inMemoryHost.equals(inMemoryHosts[i])) {
+          inMemory = true;
+          break;
+        }
+      }
+      hostInfos[i] = new SplitLocationInfo(inMemoryHosts[i], inMemory);
+    }
     this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
     blockletId = "0";
     numberOfBlocklets = 0;
-    invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
-  /**
-   * Constructor to initialize the CarbonInputSplit with blockStorageIdMap
-   * @param segmentId
-   * @param path
-   * @param start
-   * @param length
-   * @param locations
-   * @param numberOfBlocklets
-   * @param version
-   * @param blockStorageIdMap
-   */
-  public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
-      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
-      Map<String, String> blockStorageIdMap, String[] deleteDeltaFiles) {
-    this(segmentId, blockletId, path, start, length, locations, numberOfBlocklets, version,
-        deleteDeltaFiles);
-    this.blockStorageIdMap = blockStorageIdMap;
-  }
-
-  public static CarbonInputSplit from(String segmentId, String blockletId, FileSplit split,
-      ColumnarFormatVersion version, String dataMapWritePath) throws IOException {
-    return new CarbonInputSplit(segmentId, blockletId, split.getPath(), split.getStart(),
-        split.getLength(), split.getLocations(), version, null, dataMapWritePath);
+  public static CarbonInputSplit from(String segmentId, String blockletId, String path, long start,
+      long length, ColumnarFormatVersion version, String dataMapWritePath) throws IOException {
+    return new CarbonInputSplit(segmentId, blockletId, path, start, length, version, null,
+        dataMapWritePath);
   }
 
   public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
@@ -190,7 +206,7 @@ public class CarbonInputSplit extends FileSplit
           new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
       try {
         TableBlockInfo blockInfo =
-            new TableBlockInfo(split.getPath().toString(), split.blockletId, split.getStart(),
+            new TableBlockInfo(split.getFilePath(), split.blockletId, split.getStart(),
                 split.getSegment().toString(), split.getLocations(), split.getLength(),
                 blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
@@ -211,7 +227,7 @@ public class CarbonInputSplit extends FileSplit
         new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
     try {
       TableBlockInfo blockInfo =
-          new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.blockletId,
+          new TableBlockInfo(inputSplit.getFilePath(), inputSplit.blockletId,
               inputSplit.getStart(), inputSplit.getSegment().toString(), inputSplit.getLocations(),
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
@@ -237,16 +253,13 @@ public class CarbonInputSplit extends FileSplit
 
 
   @Override public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
+    this.filePath = in.readUTF();
+    this.start = in.readLong();
+    this.length = in.readLong();
     this.segment = Segment.toSegment(in.readUTF());
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     this.bucketId = in.readUTF();
     this.blockletId = in.readUTF();
-    int numInvalidSegment = in.readInt();
-    invalidSegments = new ArrayList<>(numInvalidSegment);
-    for (int i = 0; i < numInvalidSegment; i++) {
-      invalidSegments.add(in.readUTF());
-    }
     int numberOfDeleteDeltaFiles = in.readInt();
     deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
     for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
@@ -269,24 +282,24 @@ public class CarbonInputSplit extends FileSplit
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    super.write(out);
+    out.writeUTF(filePath);
+    out.writeLong(start);
+    out.writeLong(length);
     out.writeUTF(segment.toString());
     out.writeShort(version.number());
     out.writeUTF(bucketId);
     out.writeUTF(blockletId);
-    out.writeInt(invalidSegments.size());
-    for (String invalidSegment : invalidSegments) {
-      out.writeUTF(invalidSegment);
-    }
     out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
     if (null != deleteDeltaFiles) {
       for (int i = 0; i < deleteDeltaFiles.length; i++) {
         out.writeUTF(deleteDeltaFiles[i]);
       }
     }
-    out.writeBoolean(detailInfo != null);
+    out.writeBoolean(detailInfo != null || dataMapRow != null);
     if (detailInfo != null) {
       detailInfo.write(out);
+    } else if (dataMapRow != null) {
+      writeBlockletDetailsInfo(out);
     }
     out.writeBoolean(dataMapWritePath != null);
     if (dataMapWritePath != null) {
@@ -298,26 +311,6 @@ public class CarbonInputSplit extends FileSplit
     }
   }
 
-  public List<String> getInvalidSegments() {
-    return invalidSegments;
-  }
-
-  public void setInvalidSegments(List<Segment> invalidSegments) {
-    List<String> invalidSegmentIds = new ArrayList<>();
-    for (Segment segment: invalidSegments) {
-      invalidSegmentIds.add(segment.getSegmentNo());
-    }
-    this.invalidSegments = invalidSegmentIds;
-  }
-
-  public void setInvalidTimestampRange(List<UpdateVO> invalidTimestamps) {
-    invalidTimestampsList = invalidTimestamps;
-  }
-
-  public List<UpdateVO> getInvalidTimestampRange() {
-    return invalidTimestampsList;
-  }
-
   /**
    * returns the number of blocklets
    *
@@ -351,7 +344,7 @@ public class CarbonInputSplit extends FileSplit
     // converr seg ID to double.
 
     double seg1 = Double.parseDouble(segment.getSegmentNo());
-    double seg2 = Double.parseDouble(other.getSegmentId());
+    double seg2 = Double.parseDouble(other.segment.getSegmentNo());
     if (seg1 - seg2 < 0) {
       return -1;
     }
@@ -363,8 +356,8 @@ public class CarbonInputSplit extends FileSplit
     // if both the task id of the file is same then we need to compare the
     // offset of
     // the file
-    String filePath1 = this.getPath().getName();
-    String filePath2 = other.getPath().getName();
+    String filePath1 = this.getFilePath();
+    String filePath2 = other.getFilePath();
     if (CarbonTablePath.isCarbonDataFile(filePath1)) {
       byte[] firstTaskId = CarbonTablePath.DataFileUtil.getTaskNo(filePath1)
           .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
@@ -410,13 +403,15 @@ public class CarbonInputSplit extends FileSplit
     int result = taskId.hashCode();
     result = 31 * result + segment.hashCode();
     result = 31 * result + bucketId.hashCode();
-    result = 31 * result + invalidSegments.hashCode();
     result = 31 * result + numberOfBlocklets;
     return result;
   }
 
   @Override public String getBlockPath() {
-    return getPath().getName();
+    if (null == blockPath) {
+      blockPath = getPath().getName();
+    }
+    return blockPath;
   }
 
   @Override public List<Long> getMatchedBlocklets() {
@@ -429,10 +424,11 @@ public class CarbonInputSplit extends FileSplit
 
   /**
    * returns map of blocklocation and storage id
+   *
    * @return
    */
   public Map<String, String> getBlockStorageIdMap() {
-    return blockStorageIdMap;
+    return new HashMap<>();
   }
 
   public String[] getDeleteDeltaFiles() {
@@ -443,10 +439,6 @@ public class CarbonInputSplit extends FileSplit
     this.deleteDeltaFiles = deleteDeltaFiles;
   }
 
-  public BlockletDetailInfo getDetailInfo() {
-    return detailInfo;
-  }
-
   public void setDetailInfo(BlockletDetailInfo detailInfo) {
     this.detailInfo = detailInfo;
   }
@@ -459,10 +451,6 @@ public class CarbonInputSplit extends FileSplit
     this.fileFormat = fileFormat;
   }
 
-  public Blocklet makeBlocklet() {
-    return new Blocklet(getPath().getName(), blockletId);
-  }
-
   public Set<Integer> getValidBlockletIds() {
     if (null == validBlockletIds) {
       validBlockletIds = new HashSet<>();
@@ -474,4 +462,158 @@ public class CarbonInputSplit extends FileSplit
     this.validBlockletIds = validBlockletIds;
   }
 
+  public void setDataMapWritePath(String dataMapWritePath) {
+    this.dataMapWritePath = dataMapWritePath;
+  }
+
+  public void setSegment(Segment segment) {
+    this.segment = segment;
+  }
+
+  public String getDataMapWritePath() {
+    return dataMapWritePath;
+  }
+
+  public void setDataMapRow(DataMapRow dataMapRow) {
+    this.dataMapRow = dataMapRow;
+  }
+
+  public void setColumnCardinality(int[] columnCardinality) {
+    this.columnCardinality = columnCardinality;
+  }
+
+  public void setLegacyStore(boolean legacyStore) {
+    isLegacyStore = legacyStore;
+  }
+
+  public void setColumnSchema(List<ColumnSchema> columnSchema) {
+    this.columnSchema = columnSchema;
+  }
+
+  public void setUseMinMaxForPruning(boolean useMinMaxForPruning) {
+    this.useMinMaxForPruning = useMinMaxForPruning;
+  }
+
+  public void setIsBlockCache(boolean isBlockCache) {
+    this.isBlockCache = isBlockCache;
+  }
+
+  private void writeBlockletDetailsInfo(DataOutput out) throws IOException {
+    out.writeInt(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+    if (this.isBlockCache) {
+      out.writeShort(0);
+    } else {
+      out.writeShort(this.dataMapRow.getShort(BlockletDataMapRowIndexes.BLOCKLET_PAGE_COUNT_INDEX));
+    }
+    out.writeShort(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX));
+    out.writeShort(Short.parseShort(this.blockletId));
+    out.writeShort(this.columnCardinality.length);
+    for (int i = 0; i < this.columnCardinality.length; i++) {
+      out.writeInt(this.columnCardinality[i]);
+    }
+    out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX));
+    out.writeBoolean(false);
+    out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
+    // write -1 if columnSchemaBinary is null so that at the time of reading it can distinguish
+    // whether schema is written or not
+    if (null != this.columnSchema) {
+      byte[] columnSchemaBinary = BlockletDataMapUtil.convertSchemaToBinary(this.columnSchema);
+      out.writeInt(columnSchemaBinary.length);
+      out.write(columnSchemaBinary);
+    } else {
+      // write -1 if columnSchemaBinary is null so that at the time of reading it can distinguish
+      // whether schema is written or not
+      out.writeInt(-1);
+    }
+    if (this.isBlockCache) {
+      out.writeInt(0);
+      out.write(new byte[0]);
+    } else {
+      byte[] blockletInfoBinary =
+          this.dataMapRow.getByteArray(BlockletDataMapRowIndexes.BLOCKLET_INFO_INDEX);
+      out.writeInt(blockletInfoBinary.length);
+      out.write(blockletInfoBinary);
+    }
+    out.writeLong(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+    out.writeBoolean(this.isLegacyStore);
+    out.writeBoolean(this.useMinMaxForPruning);
+  }
+
+  public BlockletDetailInfo getDetailInfo() {
+    if (null != dataMapRow && detailInfo == null) {
+      detailInfo = new BlockletDetailInfo();
+      detailInfo
+          .setRowCount(this.dataMapRow.getInt(BlockletDataMapRowIndexes.ROW_COUNT_INDEX));
+      detailInfo
+          .setVersionNumber(this.dataMapRow.getShort(BlockletDataMapRowIndexes.VERSION_INDEX));
+      detailInfo.setBlockletId(Short.parseShort(this.blockletId));
+      detailInfo.setDimLens(this.columnCardinality);
+      detailInfo.setSchemaUpdatedTimeStamp(
+          this.dataMapRow.getLong(BlockletDataMapRowIndexes.SCHEMA_UPADATED_TIME_INDEX));
+      detailInfo.setBlockFooterOffset(
+          this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_FOOTER_OFFSET));
+      detailInfo
+          .setBlockSize(this.dataMapRow.getLong(BlockletDataMapRowIndexes.BLOCK_LENGTH));
+      detailInfo.setLegacyStore(isLegacyStore);
+      detailInfo.setUseMinMaxForPruning(useMinMaxForPruning);
+      if (!this.isBlockCache) {
+        detailInfo.setColumnSchemas(this.columnSchema);
+        detailInfo.setPagesCount(
+            this.dataMapRow.getShort(BlockletDataMapRowIndexes.BLOCKLET_PAGE_COUNT_INDEX));
+        detailInfo.setBlockletInfoBinary(
+            this.dataMapRow.getByteArray(BlockletDataMapRowIndexes.BLOCKLET_INFO_INDEX));
+      } else {
+        detailInfo.setBlockletInfoBinary(new byte[0]);
+      }
+      if (location == null) {
+        try {
+          location = new String(dataMapRow.getByteArray(BlockletDataMapRowIndexes.LOCATIONS),
+              CarbonCommonConstants.DEFAULT_CHARSET).split(",");
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      dataMapRow = null;
+    }
+    return detailInfo;
+  }
+
+  @Override
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return hostInfos;
+  }
+
+  /**
+   * The file containing this split's data.
+   */
+  public Path getPath() {
+    if (path == null) {
+      path = new Path(filePath);
+      return path;
+    }
+    return path;
+  }
+
+  public String getFilePath() {
+    return this.filePath;
+  }
+
+  /** The position of the first byte in the file to process. */
+  public long getStart() { return start; }
+
+  @Override
+  public long getLength() { return length; }
+
+  @Override
+  public String toString() { return filePath + ":" + start + "+" + length; }
+
+  @Override public String[] getLocations() throws IOException {
+    if (this.location == null && dataMapRow == null) {
+      return new String[] {};
+    } else if (dataMapRow != null) {
+      location = new String(dataMapRow.getByteArray(BlockletDataMapRowIndexes.LOCATIONS),
+          CarbonCommonConstants.DEFAULT_CHARSET).split(",");
+    }
+    return this.location;
+  }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java b/core/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
similarity index 100%
rename from hadoop/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
rename to core/src/main/java/org/apache/carbondata/hadoop/internal/ObjectArrayWritable.java
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java b/core/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java
similarity index 100%
rename from hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java
rename to core/src/main/java/org/apache/carbondata/hadoop/internal/index/Block.java
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 0b991cb..4c99c4f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -100,7 +100,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
 
   public void calculateLength() {
     long total = 0;
-    if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) {
+    if (splitList.size() > 1 && splitList.get(0).getDetailInfo() != null) {
       Map<String, Long> blockSizes = new HashMap<>();
       for (CarbonInputSplit split : splitList) {
         blockSizes.put(split.getBlockPath(), split.getDetailInfo().getBlockSize());
@@ -116,11 +116,21 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
     length = total;
   }
 
-  @Override
-  public String[] getLocations() {
+  @Override public String[] getLocations() {
+    getLocationIfNull();
     return locations;
   }
 
+  private void getLocationIfNull() {
+    try {
+      if (locations == null && splitList.size() == 1) {
+        this.locations = this.splitList.get(0).getLocations();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     // write number of splits and then write all splits
@@ -128,6 +138,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W
     for (CarbonInputSplit split: splitList) {
       split.write(out);
     }
+    getLocationIfNull();
     out.writeInt(locations.length);
     for (int i = 0; i < locations.length; i++) {
       out.writeUTF(locations[i]);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 1dfead3..1a529e3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -81,7 +81,7 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       splitList = new ArrayList<>(1);
-      String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
       // BlockFooterOffSet will be null in case of CarbonVectorizedReader as this has to be set
       // where multiple threads are able to read small set of files to calculate footer instead
       // of the main thread setting this for all the files.
@@ -162,7 +162,7 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     if (!skipClearDataMapAtClose) {
       // Clear the datamap cache
       DataMapStoreManager.getInstance().clearDataMaps(
-          queryModel.getTable().getAbsoluteTableIdentifier());
+          queryModel.getTable().getAbsoluteTableIdentifier(), false);
     }
     // close read support
     readSupport.close();
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 7c08dd9..d81b02c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -48,7 +48,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 
@@ -167,7 +166,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
           // Segment id is set to null because SDK does not write carbondata files with respect
           // to segments. So no specific name is present for this load.
           CarbonInputSplit split =
-              new CarbonInputSplit("null", new Path(carbonFile.getAbsolutePath()), 0,
+              new CarbonInputSplit("null", carbonFile.getAbsolutePath(), 0,
                   carbonFile.getLength(), carbonFile.getLocations(), FileFormat.COLUMNAR_V3);
           split.setVersion(ColumnarFormatVersion.V3);
           BlockletDetailInfo info = new BlockletDetailInfo();
@@ -179,7 +178,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         }
         Collections.sort(splits, new Comparator<InputSplit>() {
           @Override public int compare(InputSplit o1, InputSplit o2) {
-            return ((CarbonInputSplit) o1).getPath().compareTo(((CarbonInputSplit) o2).getPath());
+            return ((CarbonInputSplit) o1).getFilePath()
+                .compareTo(((CarbonInputSplit) o2).getFilePath());
           }
         });
       }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 26144e2..aba0ab7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -21,7 +21,14 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -38,13 +45,11 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -80,7 +85,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.log4j.Logger;
 
@@ -408,7 +412,6 @@ m filterExpression
         new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets =
         getPrunedBlocklets(job, carbonTable, expression, segmentIds);
-
     List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
     int partitionIndex = 0;
     List<Integer> partitionIdList = new ArrayList<>();
@@ -416,13 +419,13 @@ m filterExpression
       partitionIdList = partitionInfo.getPartitionIds();
     }
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      long partitionId = CarbonTablePath.DataFileUtil
-          .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
 
       // OldPartitionIdList is only used in alter table partition command because it change
       // partition info first and then read data.
       // For other normal query should use newest partitionIdList
       if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+        long partitionId = CarbonTablePath.DataFileUtil
+            .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
         if (oldPartitionIdList != null) {
           partitionIndex = oldPartitionIdList.indexOf((int) partitionId);
         } else {
@@ -436,10 +439,7 @@ m filterExpression
         // for partition table, the task id of carbaondata file name is the partition id.
         // if this partition is not required, here will skip it.
         if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
-          CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
-          if (inputSplit != null) {
-            resultFilteredBlocks.add(inputSplit);
-          }
+          resultFilteredBlocks.add(blocklet.getInputSplit());
         }
       }
     }
@@ -493,7 +493,9 @@ m filterExpression
       prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune);
     }
 
-    ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
+    if (ExplainCollector.enabled()) {
+      ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets));
+    }
 
     if (prunedBlocklets.size() == 0) {
       return prunedBlocklets;
@@ -577,7 +579,7 @@ m filterExpression
       segment.getFilteredIndexShardNames().clear();
       // Check the segment exist in any of the pruned blocklets.
       for (ExtendedBlocklet blocklet : prunedBlocklets) {
-        if (blocklet.getSegmentId().equals(segment.toString())) {
+        if (blocklet.getSegment().toString().equals(segment.toString())) {
           found = true;
           // Set the pruned index file to the segment for further pruning.
           String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
@@ -593,17 +595,6 @@ m filterExpression
     segments.removeAll(toBeRemovedSegments);
   }
 
-  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
-    CarbonInputSplit split = CarbonInputSplit
-        .from(blocklet.getSegmentId(), blocklet.getBlockletId(),
-            new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
-                blocklet.getLocations()),
-            ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
-            blocklet.getDataMapWriterPath());
-    split.setDetailInfo(blocklet.getDetailInfo());
-    return split;
-  }
-
   @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
@@ -639,20 +630,6 @@ m filterExpression
         .filterExpression(filterExpression)
         .dataConverter(getDataTypeConverter(configuration))
         .build();
-
-    // update the file level index store if there are invalid segment
-    if (inputSplit instanceof CarbonMultiBlockSplit) {
-      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
-      List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
-      if (invalidSegments.size() > 0) {
-        queryModel.setInvalidSegmentIds(invalidSegments);
-      }
-      List<UpdateVO> invalidTimestampRangeList =
-          split.getAllSplits().get(0).getInvalidTimestampRange();
-      if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
-        queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
-      }
-    }
     return queryModel;
   }
 
@@ -672,7 +649,7 @@ m filterExpression
       for (CarbonInputSplit carbonInputSplit : splits) {
         Set<Integer> validBlockletIds = carbonInputSplit.getValidBlockletIds();
         if (null != validBlockletIds && !validBlockletIds.isEmpty()) {
-          String uniqueBlockPath = carbonInputSplit.getPath().toString();
+          String uniqueBlockPath = carbonInputSplit.getFilePath();
           String shortBlockPath = CarbonTablePath
               .getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1));
           blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds);
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 4ba8b8c..a7ca290 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -141,7 +141,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
     List<Segment> invalidSegments = new ArrayList<>();
-    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
@@ -179,7 +178,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       }
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
-      invalidTimestampsList.addAll(updateStatusManager.getInvalidTimestampRange());
       if (invalidSegments.size() > 0) {
         DataMapStoreManager.getInstance()
             .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()), invalidSegments);
@@ -219,15 +217,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<InputSplit> splits =
         getSplits(job, filter, filteredSegmentToAccess, matchedPartitions, partitionInfo,
             null, updateStatusManager);
-    // pass the invalid segment to task side in order to remove index entry in task side
-    if (invalidSegments.size() > 0) {
-      for (InputSplit split : splits) {
-        ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
-        ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
-            .setInvalidTimestampRange(invalidTimestampsList);
-      }
-    }
-
     // add all splits of streaming
     List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job, streamSegments, carbonTable);
     if (!splitsOfStreaming.isEmpty()) {
@@ -320,13 +309,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           }
         }
       }
-      if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired) {
-        for (Segment segment : segmentToAccessSet) {
-          if (!filteredSegmentToAccess.contains(segment)) {
-            filteredSegmentToAccess.add(segment);
-          }
-        }
-      }
       if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
         List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess);
         filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
@@ -383,16 +365,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           // there is 10% slop to avoid to generate very small split in the end
           while (((double) bytesRemaining) / splitSize > 1.1) {
             int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-            splits.add(
-                makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
-                    splitSize, blkLocations[blkIndex].getHosts(),
+            splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(),
+                length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
             bytesRemaining -= splitSize;
           }
           if (bytesRemaining != 0) {
             int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-            splits.add(makeSplit(streamFile.getSegmentNo(), path, length - bytesRemaining,
-                bytesRemaining, blkLocations[blkIndex].getHosts(),
+            splits.add(makeSplit(streamFile.getSegmentNo(), streamFile.getFilePath(),
+                length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(),
                 blkLocations[blkIndex].getCachedHosts(), FileFormat.ROW_V1));
           }
         }
@@ -401,15 +382,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     return splits;
   }
 
-  protected FileSplit makeSplit(String segmentId, Path file, long start, long length,
-      String[] hosts, FileFormat fileFormat) {
-    return new CarbonInputSplit(segmentId, file, start, length, hosts, fileFormat);
-  }
-
-
-  protected FileSplit makeSplit(String segmentId, Path file, long start, long length,
+  protected FileSplit makeSplit(String segmentId, String filePath, long start, long length,
       String[] hosts, String[] inMemoryHosts, FileFormat fileFormat) {
-    return new CarbonInputSplit(segmentId, file, start, length, hosts, inMemoryHosts, fileFormat);
+    return new CarbonInputSplit(segmentId, filePath, start, length, hosts, inMemoryHosts,
+        fileFormat);
   }
 
   /**
@@ -421,10 +397,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    */
   public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment,
       List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) {
-    List<Segment> invalidSegments = new ArrayList<>();
-    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-
-
     try {
       carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       ReadCommittedScope readCommittedScope =
@@ -464,13 +436,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       // do block filtering and get split
       List<InputSplit> splits = getSplits(job, filter, segmentList, matchedPartitions,
           partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable));
-      // pass the invalid segment to task side in order to remove index entry in task side
-      if (invalidSegments.size() > 0) {
-        for (InputSplit split : splits) {
-          ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
-          ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList);
-        }
-      }
       return splits;
     } catch (IOException e) {
       throw new RuntimeException("Can't get splits of the target segment ", e);
@@ -541,7 +506,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         // In case IUD is not performed in this table avoid searching for
         // invalidated blocks.
         if (CarbonUtil
-            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+            .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getFilePath(),
                 invalidBlockVOForSegmentId, updateStatusManager)) {
           continue;
         }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index e18a4d4..1c11275 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -81,7 +81,7 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
     List<CarbonInputSplit> splitList;
     if (inputSplit instanceof CarbonInputSplit) {
       // Read the footer offset and set.
-      String splitPath = ((CarbonInputSplit) inputSplit).getPath().toString();
+      String splitPath = ((CarbonInputSplit) inputSplit).getFilePath();
       if (((CarbonInputSplit) inputSplit).getDetailInfo().getBlockFooterOffset() == 0L) {
         FileReader reader = FileFactory.getFileHolder(FileFactory.getFileType(splitPath),
             taskAttemptContext.getConfiguration());
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index f4f50a5..f68234c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -29,8 +29,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.Gson;
 
-import org.apache.hadoop.fs.Path;
-
 /**
  * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
  */
@@ -136,7 +134,7 @@ public class CarbonLocalInputSplit {
 
   public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) {
     CarbonInputSplit inputSplit = new CarbonInputSplit(carbonLocalInputSplit.getSegmentId(),
-        carbonLocalInputSplit.getBlockletId(), new Path(carbonLocalInputSplit.getPath()),
+        carbonLocalInputSplit.getBlockletId(), carbonLocalInputSplit.getPath(),
         carbonLocalInputSplit.getStart(), carbonLocalInputSplit.getLength(),
         carbonLocalInputSplit.getLocations()
             .toArray(new String[carbonLocalInputSplit.getLocations().size()]),
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
index d1193f5..20a2d39 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -52,7 +52,8 @@ public class Util {
    */
   public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) {
     for (CarbonInputSplit inputSplit : splitList) {
-      if (null == inputSplit.getDetailInfo().getBlockletInfo()) {
+      if (null == inputSplit.getDetailInfo() || null == inputSplit.getDetailInfo()
+          .getBlockletInfo()) {
         return true;
       }
     }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 96d288f..0e44f6d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -330,11 +330,11 @@ class CarbonMergerRDD[K, V](
           .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
       }
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
-        val blockInfo = new TableBlockInfo(entry.getPath.toString,
+        val blockInfo = new TableBlockInfo(entry.getFilePath,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
           updateStatusManager.getDeleteDeltaFilePath(
-            entry.getPath.toString,
+            entry.getFilePath,
             Segment.toSegment(entry.getSegmentId).getSegmentNo)
         )
         (!updated || (updated && (!CarbonUtil
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0ab6a3a..9e66139 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -344,13 +344,12 @@ class CarbonScanRDD[T: ClassTag](
           closePartition()
         } else {
           // Use block distribution
-          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy { f =>
-            f.getSegmentId.concat(f.getBlockPath)
-          }.values.zipWithIndex.foreach { splitWithIndex =>
+          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).zipWithIndex.foreach {
+            splitWithIndex =>
             val multiBlockSplit =
               new CarbonMultiBlockSplit(
-                splitWithIndex._1.asJava,
-                splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
+                Seq(splitWithIndex._1).asJava,
+                null)
             val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
             result.add(partition)
           }
@@ -704,7 +703,7 @@ class CarbonScanRDD[T: ClassTag](
             }.asInstanceOf[java.util.List[CarbonInputSplit]]
             // for each split and given block path set all the valid blocklet ids
             splitList.asScala.map { split =>
-              val uniqueBlockPath = split.getPath.toString
+              val uniqueBlockPath = split.getFilePath
               val shortBlockPath = CarbonTablePath
                 .getShortBlockId(uniqueBlockPath
                   .substring(uniqueBlockPath.lastIndexOf("/Part") + 1))
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index f725de3..6819a4c 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -383,7 +383,7 @@ class SparkCarbonFileFormat extends FileFormat
 
       if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
         val split = new CarbonInputSplit("null",
-          new Path(new URI(file.filePath)),
+          new Path(new URI(file.filePath)).toString,
           file.start,
           file.length,
           file.locations,
@@ -394,10 +394,10 @@ class SparkCarbonFileFormat extends FileFormat
         split.setDetailInfo(info)
         info.setBlockSize(file.length)
         // Read the footer offset and set.
-        val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getPath.toString),
+        val reader = FileFactory.getFileHolder(FileFactory.getFileType(split.getFilePath),
           broadcastedHadoopConf.value.value)
         val buffer = reader
-          .readByteBuffer(FileFactory.getUpdatedFilePath(split.getPath.toString),
+          .readByteBuffer(FileFactory.getUpdatedFilePath(split.getFilePath),
             file.length - 8,
             8)
         info.setBlockFooterOffset(buffer.getLong)
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index f6e5eab..4010ef6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -7,7 +7,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.{CarbonEnv, SaveMode}
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Ignore}
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonV3DataFormatConstants}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager


Mime
View raw message