carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2645] Segregate block and blocklet cache
Date Fri, 06 Jul 2018 06:19:44 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 19a99e15e -> 61187115c


http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 4b5b36b..eebd288 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -16,155 +16,102 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
-import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
-import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
-import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.indexstore.schema.SchemaGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.profiler.ExplainCollector;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.xerial.snappy.Snappy;
 
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap extends CoarseGrainDataMap implements Serializable {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(BlockletDataMap.class.getName());
+public class BlockletDataMap extends BlockDataMap implements Serializable {
 
   private static final long serialVersionUID = -2170289352240810993L;
 
-  private static int KEY_INDEX = 0;
-
-  private static int MIN_VALUES_INDEX = 1;
-
-  private static int MAX_VALUES_INDEX = 2;
-
-  private static int ROW_COUNT_INDEX = 3;
-
-  private static int FILE_PATH_INDEX = 4;
-
-  private static int PAGE_COUNT_INDEX = 5;
-
-  private static int VERSION_INDEX = 6;
-
-  private static int SCHEMA_UPADATED_TIME_INDEX = 7;
-
-  private static int BLOCK_INFO_INDEX = 8;
-
-  private static int BLOCK_FOOTER_OFFSET = 9;
-
-  private static int LOCATIONS = 10;
-
-  private static int BLOCKLET_ID_INDEX = 11;
-
-  private static int BLOCK_LENGTH = 12;
-
-  private static int TASK_MIN_VALUES_INDEX = 0;
-
-  private static int TASK_MAX_VALUES_INDEX = 1;
-
-  private static int SCHEMA = 2;
-
-  private static int INDEX_PATH = 3;
-
-  private static int INDEX_FILE_NAME = 4;
-
-  private static int SEGMENTID = 5;
-
-  private AbstractMemoryDMStore memoryDMStore;
-
-  private AbstractMemoryDMStore summaryDMStore;
+  @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
+    super.init(dataMapModel);
+  }
 
-  // As it is a heavy object it is not recommended to serialize this object
-  private transient SegmentProperties segmentProperties;
+  /**
+   * Method to check the cache level and load metadata based on that information
+   *
+   * @param blockletDataMapInfo
+   * @param indexInfo
+   * @throws IOException
+   * @throws MemoryException
+   */
+  protected DataMapRowImpl loadMetadata(BlockletDataMapModel blockletDataMapInfo,
+      List<DataFileFooter> indexInfo) throws IOException, MemoryException {
+    if (isLegacyStore) {
+      return loadBlockInfoForOldStore(blockletDataMapInfo, indexInfo);
+    } else {
+      return loadBlockletMetaInfo(blockletDataMapInfo, indexInfo);
+    }
+  }
 
-  private int[] columnCardinality;
+  /**
+   * Method to create blocklet schema
+   *
+   * @param segmentProperties
+   * @param addToUnsafe
+   * @throws MemoryException
+   */
+  protected void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+      throws MemoryException {
+    CarbonRowSchema[] schema = SchemaGenerator.createBlockletSchema(segmentProperties);
+    memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
+  }
 
-  private long blockletSchemaTime;
+  /**
+   * Creates the schema to store summary information or the information which can be stored only
+   * once per datamap. It stores datamap level max/min of each column and partition information of
+   * datamap
+   *
+   * @param segmentProperties
+   * @throws MemoryException
+   */
+  protected void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
+      byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
+      throws MemoryException {
+    CarbonRowSchema[] taskSummarySchema = SchemaGenerator
+        .createTaskSummarySchema(segmentProperties, schemaBinary, filePath, fileName, segmentId,
+            false);
+    taskSummaryDMStore = getMemoryDMStore(taskSummarySchema, addToUnsafe);
+  }
 
-  @Override
-  public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
-    long startTime = System.currentTimeMillis();
-    assert (dataMapModel instanceof BlockletDataMapModel);
-    BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    List<DataFileFooter> indexInfo = fileFooterConverter
-        .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
-    Path path = new Path(blockletDataMapInfo.getFilePath());
-    byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
-    byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
-    byte[] segmentId =
-        blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
-    DataMapRowImpl summaryRow = null;
-    byte[] schemaBinary = null;
-    // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID
-    // is id assigned to a blocklet within a part file
+  /**
+   * Method to load blocklet metadata information
+   *
+   * @param blockletDataMapInfo
+   * @param indexInfo
+   * @throws IOException
+   * @throws MemoryException
+   */
+  private DataMapRowImpl loadBlockletMetaInfo(BlockletDataMapModel blockletDataMapInfo,
+      List<DataFileFooter> indexInfo) throws IOException, MemoryException {
     String tempFilePath = null;
+    DataMapRowImpl summaryRow = null;
+    // Relative blocklet ID is the id assigned to a blocklet within a part file
     int relativeBlockletId = 0;
     for (DataFileFooter fileFooter : indexInfo) {
-      if (segmentProperties == null) {
-        List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
-        schemaBinary = convertSchemaToBinary(columnInTable);
-        blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
-        columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
-        segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        createSchema(segmentProperties, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
-        createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
-            segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
-      }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
           blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
@@ -173,44 +120,21 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
       // merge index but related carbondata files are deleted. In that case we first check whether
       // the file exists physically or not
       if (blockMetaInfo != null) {
-        if (fileFooter.getBlockletList() == null) {
-          // This is old store scenario, here blocklet information is not available in index file so
-          // load only block info
-          summaryRow =
-              loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
-                  blockMetaInfo);
-        } else {
-          // blocklet ID will start from 0 again only when part file path is changed
-          if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) {
-            tempFilePath = blockInfo.getFilePath();
-            relativeBlockletId = 0;
-          }
-          summaryRow =
-              loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
-                  blockMetaInfo, relativeBlockletId);
-          // this is done because relative blocklet id need to be incremented based on the
-          // total number of blocklets
-          relativeBlockletId += fileFooter.getBlockletList().size();
+        // this case is for CACHE_LEVEL = BLOCKLET
+        // blocklet ID will start from 0 again only when part file path is changed
+        if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) {
+          tempFilePath = blockInfo.getFilePath();
+          relativeBlockletId = 0;
         }
+        summaryRow =
+            loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow,
+                blockMetaInfo, relativeBlockletId);
+        // this is done because relative blocklet id need to be incremented based on the
+        // total number of blocklets
+        relativeBlockletId += fileFooter.getBlockletList().size();
       }
     }
-    if (memoryDMStore != null) {
-      memoryDMStore.finishWriting();
-    }
-    if (null != summaryDMStore) {
-      addTaskSummaryRowToUnsafeMemoryStore(
-          summaryRow,
-          schemaBinary,
-          filePath,
-          fileName,
-          segmentId);
-      summaryDMStore.finishWriting();
-    }
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
-              + (System.currentTimeMillis() - startTime));
-    }
+    return summaryRow;
   }
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -221,839 +145,81 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
     CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (!blockletList.isEmpty() && summaryRow == null) {
-      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(taskSummaryDMStore.getSchema());
     }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
       int ordinal = 0;
       int taskMinMaxOrdinal = 0;
       BlockletInfo blockletInfo = blockletList.get(index);
-
-      // add start key as index key
-      row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
-
       BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
-      byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
-      row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMinValues()), ordinal);
       // compute and set task level min values
-      addTaskMinMaxValues(summaryRow, minMaxLen,
-          summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
-          TASK_MIN_VALUES_INDEX, true);
+      addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal,
+          minMaxIndex.getMinValues(), TASK_MIN_VALUES_INDEX, true);
       ordinal++;
       taskMinMaxOrdinal++;
-      byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
-      row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minMaxIndex.getMaxValues()), ordinal);
       // compute and set task level max values
-      addTaskMinMaxValues(summaryRow, minMaxLen,
-          summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
-          TASK_MAX_VALUES_INDEX, false);
+      addTaskMinMaxValues(summaryRow, minMaxLen, taskSummaryDMStore.getSchema(), taskMinMaxOrdinal,
+          minMaxIndex.getMaxValues(), TASK_MAX_VALUES_INDEX, false);
       ordinal++;
-
       row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
-
       // add file path
       byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
       row.setByteArray(filePathBytes, ordinal++);
-
-      // add pages
-      row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
-
       // add version number
       row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
       // add schema updated time
       row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
-      // add blocklet info
       byte[] serializedData;
       try {
+        // Add block footer offset, it is used if we need to read footer of block
+        row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
+        setLocations(blockMetaInfo.getLocationInfo(), row, ordinal++);
+        // Store block size
+        row.setLong(blockMetaInfo.getSize(), ordinal++);
+        // add blocklet info
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         DataOutput dataOutput = new DataOutputStream(stream);
         blockletInfo.write(dataOutput);
         serializedData = stream.toByteArray();
         row.setByteArray(serializedData, ordinal++);
-        // Add block footer offset, it is used if we need to read footer of block
-        row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
-        setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
-        ordinal++;
-        // for relative blockelt id i.e blocklet id that belongs to a particular part file
-        row.setShort((short) relativeBlockletId++, ordinal++);
-        // Store block size
-        row.setLong(blockMetaInfo.getSize(), ordinal);
+        // add pages
+        row.setShort((short) blockletInfo.getNumberOfPages(), ordinal++);
+        // for relative blocklet id i.e blocklet id that belongs to a particular carbondata file
+        row.setShort((short) relativeBlockletId++, ordinal);
         memoryDMStore.addIndexRow(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     }
-
-    return summaryRow;
-  }
-
-  private void setLocations(String[] locations, DataMapRow row, int ordinal)
-      throws UnsupportedEncodingException {
-    // Add location info
-    String locationStr = StringUtils.join(locations, ',');
-    row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), ordinal);
-  }
-
-  /**
-   * Load information for the block.It is the case can happen only for old stores
-   * where blocklet information is not available in index file. So load only block information
-   * and read blocklet information in executor.
-   */
-  private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
-      SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow,
-      BlockMetaInfo blockMetaInfo) {
-    int[] minMaxLen = segmentProperties.getColumnsValueSize();
-    BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
-    CarbonRowSchema[] schema = memoryDMStore.getSchema();
-    // Add one row to maintain task level min max for segment pruning
-    if (summaryRow == null) {
-      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
-    }
-    DataMapRow row = new DataMapRowImpl(schema);
-    int ordinal = 0;
-    int taskMinMaxOrdinal = 0;
-    // add start key as index key
-    row.setByteArray(blockletIndex.getBtreeIndex().getStartKey(), ordinal++);
-
-    BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex();
-    byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
-    byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
-    // update min max values in case of old store
-    byte[][] updatedMinValues =
-        CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true);
-    byte[][] updatedMaxValues =
-        CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false);
-    row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
-    // compute and set task level min values
-    addTaskMinMaxValues(summaryRow, minMaxLen,
-        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
-        TASK_MIN_VALUES_INDEX, true);
-    ordinal++;
-    taskMinMaxOrdinal++;
-    row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
-    // compute and set task level max values
-    addTaskMinMaxValues(summaryRow, minMaxLen,
-        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
-        TASK_MAX_VALUES_INDEX, false);
-    ordinal++;
-
-    row.setInt((int)fileFooter.getNumberOfRows(), ordinal++);
-
-    // add file path
-    byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
-    row.setByteArray(filePathBytes, ordinal++);
-
-    // add pages
-    row.setShort((short) 0, ordinal++);
-
-    // add version number
-    row.setShort(fileFooter.getVersionId().number(), ordinal++);
-
-    // add schema updated time
-    row.setLong(fileFooter.getSchemaUpdatedTimeStamp(), ordinal++);
-
-    // add blocklet info
-    row.setByteArray(new byte[0], ordinal++);
-
-    row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++);
-    try {
-      setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
-      ordinal++;
-      // for relative blocklet id. Value is -1 because in case of old store blocklet info will
-      // not be present in the index file and in that case we will not knwo the total number of
-      // blocklets
-      row.setShort((short) -1, ordinal++);
-
-      // store block size
-      row.setLong(blockMetaInfo.getSize(), ordinal);
-      memoryDMStore.addIndexRow(row);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
     return summaryRow;
   }
 
-  private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary,
-      byte[] filePath, byte[] fileName, byte[] segmentId) {
-    // write the task summary info to unsafe memory store
-    if (null != summaryRow) {
-      // Add column schema , it is useful to generate segment properties in executor.
-      // So we no need to read footer again there.
-      if (schemaBinary != null) {
-        summaryRow.setByteArray(schemaBinary, SCHEMA);
-      }
-      summaryRow.setByteArray(filePath, INDEX_PATH);
-      summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
-      summaryRow.setByteArray(segmentId, SEGMENTID);
-      try {
-        summaryDMStore.addIndexRow(summaryRow);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Fill the measures min values with minimum , this is needed for backward version compatability
-   * as older versions don't store min values for measures
-   */
-  private byte[][] updateMinValues(byte[][] minValues, int[] minMaxLen) {
-    byte[][] updatedValues = minValues;
-    if (minValues.length < minMaxLen.length) {
-      updatedValues = new byte[minMaxLen.length][];
-      System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
-      List<CarbonMeasure> measures = segmentProperties.getMeasures();
-      ByteBuffer buffer = ByteBuffer.allocate(8);
-      for (int i = 0; i < measures.size(); i++) {
-        buffer.rewind();
-        DataType dataType = measures.get(i).getDataType();
-        if (dataType == DataTypes.BYTE) {
-          buffer.putLong(Byte.MIN_VALUE);
-          updatedValues[minValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.SHORT) {
-          buffer.putLong(Short.MIN_VALUE);
-          updatedValues[minValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.INT) {
-          buffer.putLong(Integer.MIN_VALUE);
-          updatedValues[minValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.LONG) {
-          buffer.putLong(Long.MIN_VALUE);
-          updatedValues[minValues.length + i] = buffer.array().clone();
-        } else if (DataTypes.isDecimal(dataType)) {
-          updatedValues[minValues.length + i] =
-              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
-        } else {
-          buffer.putDouble(Double.MIN_VALUE);
-          updatedValues[minValues.length + i] = buffer.array().clone();
-        }
-      }
-    }
-    return updatedValues;
-  }
-
-  /**
-   * Fill the measures max values with maximum , this is needed for backward version compatability
-   * as older versions don't store max values for measures
-   */
-  private byte[][] updateMaxValues(byte[][] maxValues, int[] minMaxLen) {
-    byte[][] updatedValues = maxValues;
-    if (maxValues.length < minMaxLen.length) {
-      updatedValues = new byte[minMaxLen.length][];
-      System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
-      List<CarbonMeasure> measures = segmentProperties.getMeasures();
-      ByteBuffer buffer = ByteBuffer.allocate(8);
-      for (int i = 0; i < measures.size(); i++) {
-        buffer.rewind();
-        DataType dataType = measures.get(i).getDataType();
-        if (dataType == DataTypes.BYTE) {
-          buffer.putLong(Byte.MAX_VALUE);
-          updatedValues[maxValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.SHORT) {
-          buffer.putLong(Short.MAX_VALUE);
-          updatedValues[maxValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.INT) {
-          buffer.putLong(Integer.MAX_VALUE);
-          updatedValues[maxValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.LONG) {
-          buffer.putLong(Long.MAX_VALUE);
-          updatedValues[maxValues.length + i] = buffer.array().clone();
-        } else if (DataTypes.isDecimal(dataType)) {
-          updatedValues[maxValues.length + i] =
-              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
-        } else {
-          buffer.putDouble(Double.MAX_VALUE);
-          updatedValues[maxValues.length + i] = buffer.array().clone();
-        }
-      }
-    }
-    return updatedValues;
-  }
-
-  private DataMapRow addMinMax(int[] minMaxLen, CarbonRowSchema carbonRowSchema,
-      byte[][] minValues) {
-    CarbonRowSchema[] minSchemas =
-        ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
-    DataMapRow minRow = new DataMapRowImpl(minSchemas);
-    int minOrdinal = 0;
-    // min value adding
-    for (int i = 0; i < minMaxLen.length; i++) {
-      minRow.setByteArray(minValues[i], minOrdinal++);
-    }
-    return minRow;
-  }
-
-  /**
-   * This method will compute min/max values at task level
-   *
-   * @param taskMinMaxRow
-   * @param minMaxLen
-   * @param carbonRowSchema
-   * @param minMaxValue
-   * @param ordinal
-   * @param isMinValueComparison
-   */
-  private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
-      CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
-      boolean isMinValueComparison) {
-    DataMapRow row = taskMinMaxRow.getRow(ordinal);
-    byte[][] updatedMinMaxValues = minMaxValue;
-    if (null == row) {
-      CarbonRowSchema[] minSchemas =
-          ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
-      row = new DataMapRowImpl(minSchemas);
-    } else {
-      byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
-      // Compare and update min max values
-      for (int i = 0; i < minMaxLen.length; i++) {
-        int compare =
-            ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
-        if (isMinValueComparison) {
-          if (compare < 0) {
-            updatedMinMaxValues[i] = existingMinMaxValues[i];
-          }
-        } else if (compare > 0) {
-          updatedMinMaxValues[i] = existingMinMaxValues[i];
-        }
-      }
-    }
-    int minMaxOrdinal = 0;
-    // min/max value adding
-    for (int i = 0; i < minMaxLen.length; i++) {
-      row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
-    }
-    taskMinMaxRow.setRow(row, ordinal);
-  }
-
-  private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
-      throws MemoryException {
-    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
-
-    // Index key
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-    getMinMaxSchema(segmentProperties, indexSchemas);
-
-    // for number of rows.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
-
-    // for table block path
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
-    // for number of pages.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
-    // for version number.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
-    // for schema updated time.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
-    //for blocklet info
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
-    // for block footer offset.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
-    // for locations
-    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-
-    // for relative blocklet id i.e. blocklet id that belongs to a particular part file.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
-
-    // for storing block length.
-    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
-
-    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
-    memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
-  }
-
-  /**
-   * Creates the schema to store summary information or the information which can be stored only
-   * once per datamap. It stores datamap level max/min of each column and partition information of
-   * datamap
-   * @param segmentProperties
-   * @throws MemoryException
-   */
-  private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
-      byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
-      throws MemoryException {
-    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
-    getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
-    // for storing column schema
-    taskMinMaxSchemas.add(
-        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
-    // for storing file path
-    taskMinMaxSchemas.add(
-        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
-    // for storing file name
-    taskMinMaxSchemas.add(
-        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
-    // for storing segmentid
-    taskMinMaxSchemas.add(
-        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
-    CarbonRowSchema[] schema =
-        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
-    summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
-  }
-
-  private void getMinMaxSchema(SegmentProperties segmentProperties,
-      List<CarbonRowSchema> minMaxSchemas) {
-    // Index key
-    int[] minMaxLen = segmentProperties.getColumnsValueSize();
-    // do it 2 times, one for min and one for max.
-    for (int k = 0; k < 2; k++) {
-      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
-      for (int i = 0; i < minMaxLen.length; i++) {
-        if (minMaxLen[i] <= 0) {
-          boolean isVarchar = false;
-          if (i < segmentProperties.getDimensions().size()
-              && segmentProperties.getDimensions().get(i).getDataType() == DataTypes.VARCHAR) {
-            isVarchar = true;
-          }
-          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY,
-              isVarchar);
-        } else {
-          mapSchemas[i] =
-              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
-        }
-      }
-      CarbonRowSchema mapSchema =
-          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
-              mapSchemas);
-      minMaxSchemas.add(mapSchema);
-    }
-  }
-
-  @Override
-  public boolean isScanRequired(FilterResolverIntf filterExp) {
-    FilterExecuter filterExecuter =
-        FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-    for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
-      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
-      boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
-          filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
-          getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
-      if (isScanRequired) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
-    if (memoryDMStore.getRowCount() == 0) {
-      return new ArrayList<>();
-    }
-    List<Blocklet> blocklets = new ArrayList<>();
-    int numBlocklets = 0;
-    if (filterExp == null) {
-      numBlocklets = memoryDMStore.getRowCount();
-      for (int i = 0; i < numBlocklets; i++) {
-        DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
-        blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
-      }
-    } else {
-      // Remove B-tree jump logic as start and end key prepared is not
-      // correct for old store scenarios
-      int startIndex = 0;
-      numBlocklets = memoryDMStore.getRowCount();
-      FilterExecuter filterExecuter =
-          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-      while (startIndex < numBlocklets) {
-        DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
-        int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
-        String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
-            CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
-        boolean isValid =
-            addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX),
-                getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId);
-        if (isValid) {
-          blocklets.add(createBlocklet(safeRow, blockletId));
-        }
-        startIndex++;
-      }
-    }
-    ExplainCollector.addTotalBlocklets(numBlocklets);
-    return blocklets;
-  }
-
-  @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions) {
-    if (memoryDMStore.getRowCount() == 0) {
-      return new ArrayList<>();
-    }
-    // if it has partitioned datamap but there is no partitioned information stored, it means
-    // partitions are dropped so return empty list.
-    if (partitions != null) {
-      // First get the partitions which are stored inside datamap.
-      String[] fileDetails = getFileDetails();
-      // Check the exact match of partition information inside the stored partitions.
-      boolean found = false;
-      Path folderPath = new Path(fileDetails[0]);
-      for (PartitionSpec spec : partitions) {
-        if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) {
-          found = true;
-          break;
-        }
-      }
-      if (!found) {
-        return new ArrayList<>();
-      }
-    }
-    // Prune with filters if the partitions are existed in this datamap
-    // changed segmentProperties to this.segmentProperties to make sure the pruning with its own
-    // segmentProperties.
-    // Its a temporary fix. The Interface DataMap.prune(FilterResolverIntf filterExp,
-    // SegmentProperties segmentProperties, List<PartitionSpec> partitions) should be corrected
-    return prune(filterExp, this.segmentProperties);
-  }
-
-  @Override
-  public void finish() {
-
-  }
-
-  private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
-    boolean needToScan = false;
-    if (spec.getUuid() != null) {
-      String[] split = spec.getUuid().split("_");
-      if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil
-          .getTimeStampFromFileName(fileDetails[1]).equals(split[1])) {
-        needToScan = true;
-      }
-    } else {
-      needToScan = true;
-    }
-    return needToScan;
-  }
-
-  /**
-   * select the blocks based on column min and max value
-   *
-   * @param filterExecuter
-   * @param maxValue
-   * @param minValue
-   * @param filePath
-   * @param blockletId
-   * @return
-   */
-  private boolean addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, byte[][] maxValue,
-      byte[][] minValue, String filePath, int blockletId) {
-    BitSet bitSet = null;
-    if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
-      String uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
-      // this case will come in case of old store where index file does not contain the
-      // blocklet information
-      if (blockletId != -1) {
-        uniqueBlockPath = uniqueBlockPath + CarbonCommonConstants.FILE_SEPARATOR + blockletId;
-      }
-      bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
-          .isFilterValuesPresentInBlockOrBlocklet(maxValue, minValue, uniqueBlockPath);
-    } else {
-      bitSet = filterExecuter.isScanRequired(maxValue, minValue);
-    }
-    if (!bitSet.isEmpty()) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
   public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
-    int index = Integer.parseInt(blockletId);
-    DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
-    return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
-  }
-
-  /**
-   * Get the index file name of the blocklet data map
-   *
-   * @return
-   */
-  public String getIndexFileName() {
-    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
-    try {
-      return new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
-          CarbonCommonConstants.DEFAULT_CHARSET);
-    } catch (UnsupportedEncodingException e) {
-      // should never happen!
-      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+    if (isLegacyStore) {
+      super.getDetailedBlocklet(blockletId);
     }
+    int absoluteBlockletId = Integer.parseInt(blockletId);
+    DataMapRow safeRow = memoryDMStore.getDataMapRow(absoluteBlockletId).convertToSafeRow();
+    short relativeBlockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
+    return createBlocklet(safeRow, relativeBlockletId);
   }
 
-  private byte[][] getMinMaxValue(DataMapRow row, int index) {
-    DataMapRow minMaxRow = row.getRow(index);
-    byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
-    for (int i = 0; i < minMax.length; i++) {
-      minMax[i] = minMaxRow.getByteArray(i);
-    }
-    return minMax;
+  protected short getBlockletId(DataMapRow dataMapRow) {
+    return dataMapRow.getShort(BLOCKLET_ID_INDEX);
   }
 
-  private ExtendedBlocklet createBlocklet(DataMapRow row, int blockletId) {
+  protected ExtendedBlocklet createBlocklet(DataMapRow row, short blockletId) {
     ExtendedBlocklet blocklet = new ExtendedBlocklet(
         new String(row.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
         blockletId + "");
-    BlockletDetailInfo detailInfo = new BlockletDetailInfo();
-    detailInfo.setRowCount(row.getInt(ROW_COUNT_INDEX));
-    detailInfo.setPagesCount(row.getShort(PAGE_COUNT_INDEX));
-    detailInfo.setVersionNumber(row.getShort(VERSION_INDEX));
-    detailInfo.setBlockletId((short) blockletId);
-    detailInfo.setDimLens(columnCardinality);
-    detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
-    try {
-      blocklet.setLocation(
-          new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
-              .split(","));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    BlockletDetailInfo detailInfo = getBlockletDetailInfo(row, blockletId, blocklet);
+    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCKLET_INFO_INDEX));
+    detailInfo.setPagesCount(row.getShort(BLOCKLET_PAGE_COUNT_INDEX));
     blocklet.setDetailInfo(detailInfo);
-    detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
-    detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
-    detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
     return blocklet;
   }
 
-  private String[] getFileDetails() {
-    try {
-      String[] fileDetails = new String[3];
-      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
-      fileDetails[0] =
-          new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
-      fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
-          CarbonCommonConstants.DEFAULT_CHARSET);
-      fileDetails[2] = new String(unsafeRow.getByteArray(SEGMENTID),
-          CarbonCommonConstants.DEFAULT_CHARSET);
-      return fileDetails;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-
-  /**
-   * Binary search used to get the first tentative index row based on
-   * search key
-   *
-   * @param key search key
-   * @return first tentative block
-   */
-  private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
-    int childNodeIndex;
-    int low = 0;
-    int high = memoryDMStore.getRowCount() - 1;
-    int mid = 0;
-    int compareRes = -1;
-    //
-    while (low <= high) {
-      mid = (low + high) >>> 1;
-      // compare the entries
-      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
-      if (compareRes < 0) {
-        high = mid - 1;
-      } else if (compareRes > 0) {
-        low = mid + 1;
-      } else {
-        // if key is matched then get the first entry
-        int currentPos = mid;
-        while (currentPos - 1 >= 0
-            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
-          currentPos--;
-        }
-        mid = currentPos;
-        break;
-      }
-    }
-    // if compare result is less than zero then we
-    // and mid is more than 0 then we need to previous block as duplicates
-    // record can be present
-    if (compareRes < 0) {
-      if (mid > 0) {
-        mid--;
-      }
-      childNodeIndex = mid;
-    } else {
-      childNodeIndex = mid;
-    }
-    // get the leaf child
-    return childNodeIndex;
-  }
-
-  /**
-   * Binary search used to get the last tentative block  based on
-   * search key
-   *
-   * @param key search key
-   * @return first tentative block
-   */
-  private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
-    int childNodeIndex;
-    int low = 0;
-    int high = memoryDMStore.getRowCount() - 1;
-    int mid = 0;
-    int compareRes = -1;
-    //
-    while (low <= high) {
-      mid = (low + high) >>> 1;
-      // compare the entries
-      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
-      if (compareRes < 0) {
-        high = mid - 1;
-      } else if (compareRes > 0) {
-        low = mid + 1;
-      } else {
-        int currentPos = mid;
-        // if key is matched then get the first entry
-        while (currentPos + 1 < memoryDMStore.getRowCount()
-            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
-          currentPos++;
-        }
-        mid = currentPos;
-        break;
-      }
-    }
-    // if compare result is less than zero then we
-    // and mid is more than 0 then we need to previous block as duplicates
-    // record can be present
-    if (compareRes < 0) {
-      if (mid > 0) {
-        mid--;
-      }
-      childNodeIndex = mid;
-    } else {
-      childNodeIndex = mid;
-    }
-    return childNodeIndex;
-  }
-
-  private DataMapRow convertToRow(IndexKey key) {
-    ByteBuffer buffer =
-        ByteBuffer.allocate(key.getDictionaryKeys().length + key.getNoDictionaryKeys().length + 8);
-    buffer.putInt(key.getDictionaryKeys().length);
-    buffer.putInt(key.getNoDictionaryKeys().length);
-    buffer.put(key.getDictionaryKeys());
-    buffer.put(key.getNoDictionaryKeys());
-    DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
-    dataMapRow.setByteArray(buffer.array(), 0);
-    return dataMapRow;
-  }
-
-  public byte[] getColumnSchemaBinary() {
-    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
-    return unsafeRow.getByteArray(SCHEMA);
-  }
-
-  /**
-   * Convert schema to binary
-   */
-  private byte[] convertSchemaToBinary(List<ColumnSchema> columnSchemas) throws IOException {
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    DataOutput dataOutput = new DataOutputStream(stream);
-    dataOutput.writeShort(columnSchemas.size());
-    for (ColumnSchema columnSchema : columnSchemas) {
-      if (columnSchema.getColumnReferenceId() == null) {
-        columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId());
-      }
-      columnSchema.write(dataOutput);
-    }
-    byte[] byteArray = stream.toByteArray();
-    // Compress with snappy to reduce the size of schema
-    return Snappy.rawCompress(byteArray, byteArray.length);
-  }
-
-  @Override
-  public void clear() {
-    if (memoryDMStore != null) {
-      memoryDMStore.freeMemory();
-      memoryDMStore = null;
-      segmentProperties = null;
-    }
-    // clear task min/max unsafe memory
-    if (null != summaryDMStore) {
-      summaryDMStore.freeMemory();
-      summaryDMStore = null;
-    }
-  }
-
-  public long getMemorySize() {
-    long memoryUsed = 0L;
-    if (memoryDMStore != null) {
-      memoryUsed += memoryDMStore.getMemoryUsed();
-    }
-    if (null != summaryDMStore) {
-      memoryUsed += summaryDMStore.getMemoryUsed();
-    }
-    return memoryUsed;
-  }
-
-  public SegmentProperties getSegmentProperties() {
-    return segmentProperties;
-  }
-
-  public void setSegmentProperties(SegmentProperties segmentProperties) {
-    this.segmentProperties = segmentProperties;
-  }
-
-  public int[] getColumnCardinality() {
-    return columnCardinality;
-  }
-
-  private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
-      throws MemoryException {
-    AbstractMemoryDMStore memoryDMStore;
-    if (addToUnsafe) {
-      memoryDMStore = new UnsafeMemoryDMStore(schema);
-    } else {
-      memoryDMStore = new SafeMemoryDMStore(schema);
-    }
-    return memoryDMStore;
-  }
-
-  /**
-   * This method will ocnvert safe to unsafe memory DM store
-   *
-   * @throws MemoryException
-   */
-  public void convertToUnsafeDMStore() throws MemoryException {
-    if (memoryDMStore instanceof SafeMemoryDMStore) {
-      UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
-      memoryDMStore.freeMemory();
-      memoryDMStore = unsafeMemoryDMStore;
-    }
-    if (summaryDMStore instanceof SafeMemoryDMStore) {
-      UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
-      summaryDMStore.freeMemory();
-      summaryDMStore = unsafeSummaryMemoryDMStore;
-    }
-  }
-
-  /**
-   * Read column schema from binary
-   * @param schemaArray
-   * @throws IOException
-   */
-  public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
-    // uncompress it.
-    schemaArray = Snappy.uncompress(schemaArray);
-    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
-    DataInput schemaInput = new DataInputStream(schemaStream);
-    List<ColumnSchema> columnSchemas = new ArrayList<>();
-    int size = schemaInput.readShort();
-    for (int i = 0; i < size; i++) {
-      ColumnSchema columnSchema = new ColumnSchema();
-      columnSchema.readFields(schemaInput);
-      columnSchemas.add(columnSchema);
-    }
-    return columnSchemas;
-  }
-
-  public long getBlockletSchemaTime() {
-    return blockletSchemaTime;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 836b6a3..f9bc7b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -68,6 +68,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   private static final Log LOG = LogFactory.getLog(BlockletDataMapFactory.class);
   private static final String NAME = "clustered.btree.blocklet";
+  /**
+   * variable for cache level BLOCKLET
+   */
+  private static final String CACHE_LEVEL_BLOCKLET = "BLOCKLET";
 
   public static final DataMapSchema DATA_MAP_SCHEMA =
       new DataMapSchema(NAME, BlockletDataMapFactory.class.getName());
@@ -86,6 +90,25 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
   }
 
+  /**
+   * create dataMap based on cache level
+   *
+   * @param carbonTable
+   * @return
+   */
+  public static DataMap createDataMap(CarbonTable carbonTable) {
+    boolean cacheLevelBlock =
+        BlockletDataMapUtil.isCacheLevelBlock(carbonTable, CACHE_LEVEL_BLOCKLET);
+    cacheLevelBlock = false;
+    if (cacheLevelBlock) {
+      // case1: when CACHE_LEVEL = BLOCK
+      return new BlockDataMap();
+    } else {
+      // case2: when CACHE_LEVEL = BLOCKLET
+      return new BlockletDataMap();
+    }
+  }
+
   @Override
   public DataMapWriter createWriter(Segment segment, String shardName,
       SegmentProperties segmentProperties) throws IOException {
@@ -187,10 +210,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       throws IOException {
     for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : identifiersWrapper) {
       BlockletDataMapIndexWrapper wrapper = cache.get(identifierWrapper);
-      List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+      List<BlockDataMap> dataMaps = wrapper.getDataMaps();
       for (DataMap dataMap : dataMaps) {
-        if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
-          return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+        if (((BlockDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
+          return ((BlockDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
         }
       }
     }
@@ -244,7 +267,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
             new TableBlockIndexUniqueIdentifierWrapper(blockIndex, this.getCarbonTable());
         BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndexWrapper);
         if (null != wrapper) {
-          List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+          List<BlockDataMap> dataMaps = wrapper.getDataMaps();
           for (DataMap dataMap : dataMaps) {
             if (dataMap != null) {
               cache.invalidate(blockIndexWrapper);
@@ -314,8 +337,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
     assert (dataMaps.size() > 0);
     CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
-    assert (coarseGrainDataMap instanceof BlockletDataMap);
-    BlockletDataMap dataMap = (BlockletDataMap) coarseGrainDataMap;
+    assert (coarseGrainDataMap instanceof BlockDataMap);
+    BlockDataMap dataMap = (BlockDataMap) coarseGrainDataMap;
     return dataMap.getSegmentProperties();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
new file mode 100644
index 0000000..3826b07
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+/**
+ * holder for blocklet info indexes in a DataMap row
+ */
+public interface BlockletDataMapRowIndexes {
+
+  // Each DataMapRow Indexes for blocklet and block dataMap
+  int MIN_VALUES_INDEX = 0;
+
+  int MAX_VALUES_INDEX = 1;
+
+  int ROW_COUNT_INDEX = 2;
+
+  int FILE_PATH_INDEX = 3;
+
+  int VERSION_INDEX = 4;
+
+  int SCHEMA_UPADATED_TIME_INDEX = 5;
+
+  int BLOCK_FOOTER_OFFSET = 6;
+
+  int LOCATIONS = 7;
+
+  int BLOCK_LENGTH = 8;
+
+  // below variables are specific for blockletDataMap
+  int BLOCKLET_INFO_INDEX = 9;
+
+  int BLOCKLET_PAGE_COUNT_INDEX = 10;
+
+  int BLOCKLET_ID_INDEX = 11;
+
+  // Summary dataMap row indexes
+  int TASK_MIN_VALUES_INDEX = 0;
+
+  int TASK_MAX_VALUES_INDEX = 1;
+
+  int SUMMARY_SCHEMA = 2;
+
+  int SUMMARY_INDEX_PATH = 3;
+
+  int SUMMARY_INDEX_FILE_NAME = 4;
+
+  int SUMMARY_SEGMENTID = 5;
+
+  int SUMMARY_BLOCKLET_COUNT = 6;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 127e2a9..a245bc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -229,7 +229,7 @@ public class UnsafeDataMapRow extends DataMapRow {
             getUnsafe().copyMemory(
                 block.getBaseObject(),
                 block.getBaseOffset() + pointer + runningLength,
-                    data,
+                data,
                 BYTE_ARRAY_OFFSET,
                 data.length);
             row.setByteArray(data, i);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
new file mode 100644
index 0000000..310cabf
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+/**
+ * class for creating schema for a given DataMap
+ */
+public class SchemaGenerator {
+
+  /**
+   * Method for creating blocklet Schema. Each blocklet row will share the same schema
+   *
+   * @param segmentProperties
+   * @return
+   */
+  public static CarbonRowSchema[] createBlockSchema(SegmentProperties segmentProperties) {
+    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
+    // get MinMax Schema
+    getMinMaxSchema(segmentProperties, indexSchemas);
+    // for number of rows.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
+    // for table block path
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    // for version number.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+    // for schema updated time.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    // for block footer offset.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    // for locations
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    // for storing block length.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    return schema;
+  }
+
+  /**
+   * Method for creating blocklet Schema. Each blocklet row will share the same schema
+   *
+   * @param segmentProperties
+   * @return
+   */
+  public static CarbonRowSchema[] createBlockletSchema(SegmentProperties segmentProperties) {
+    List<CarbonRowSchema> indexSchemas = new ArrayList<>();
+    // get MinMax Schema
+    getMinMaxSchema(segmentProperties, indexSchemas);
+    // for number of rows.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
+    // for table block path
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    // for version number.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+    // for schema updated time.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    // for block footer offset.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    // for locations
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    // for storing block length.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+    //for blocklet info
+    indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    // for number of pages.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+    // for relative blocklet id i.e. blocklet id that belongs to a particular part file
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
+    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    return schema;
+  }
+
+  /**
+   * Creates the schema to store summary information or the information which can be stored only
+   * once per datamap. It stores datamap level max/min of each column and partition information of
+   * datamap
+   *
+   * @param segmentProperties
+   * @throws MemoryException
+   */
+  public static CarbonRowSchema[] createTaskSummarySchema(SegmentProperties segmentProperties,
+      byte[] schemaBinary, byte[] filePath, byte[] fileName, byte[] segmentId,
+      boolean storeBlockletCount) throws MemoryException {
+    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
+    // get MinMax Schema
+    getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+    // for storing column schema
+    taskMinMaxSchemas
+        .add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
+    // for storing file path
+    taskMinMaxSchemas
+        .add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
+    // for storing file name
+    taskMinMaxSchemas
+        .add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
+    // for storing segmentid
+    taskMinMaxSchemas
+        .add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
+    // flag to check whether it is required to store blocklet count of each carbondata file as
+    // binary in summary schema. This will be true when it is not a legacy store (>1.1 version)
+    // and CACHE_LEVEL=BLOCK
+    if (storeBlockletCount) {
+      taskMinMaxSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+    }
+    CarbonRowSchema[] schema =
+        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+    return schema;
+  }
+
+  /**
+   * Method to create schema for storing min/max data
+   *
+   * @param segmentProperties
+   * @param minMaxSchemas
+   */
+  private static void getMinMaxSchema(SegmentProperties segmentProperties,
+      List<CarbonRowSchema> minMaxSchemas) {
+    // Index key
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    // do it 2 times, one for min and one for max.
+    for (int k = 0; k < 2; k++) {
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
+      for (int i = 0; i < minMaxLen.length; i++) {
+        if (minMaxLen[i] <= 0) {
+          boolean isVarchar = false;
+          if (i < segmentProperties.getDimensions().size()
+              && segmentProperties.getDimensions().get(i).getDataType() == DataTypes.VARCHAR) {
+            isVarchar = true;
+          }
+          mapSchemas[i] =
+              new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY, isVarchar);
+        } else {
+          mapSchemas[i] =
+              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+        }
+      }
+      CarbonRowSchema mapSchema =
+          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
+      minMaxSchemas.add(mapSchema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 63cfa21..f0f7272 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -175,8 +175,10 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
         // TODO. Nested File Paths.
         if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
           // Get Segment Name from the IndexFile.
+          String indexFilePath =
+              FileFactory.getUpdatedFilePath(carbonIndexFiles[i].getAbsolutePath());
           String segId =
-              getSegmentID(carbonIndexFiles[i].getName(), carbonIndexFiles[i].getAbsolutePath());
+              getSegmentID(carbonIndexFiles[i].getName(), indexFilePath);
           // TODO. During Partition table handling, place Segment File Name.
           List<String> indexList;
           SegmentRefreshInfo segmentRefreshInfo;
@@ -190,7 +192,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
             indexList = indexFileStore.get(segId);
             segmentRefreshInfo = segmentTimestampUpdaterMap.get(segId);
           }
-          indexList.add(carbonIndexFiles[i].getAbsolutePath());
+          indexList.add(indexFilePath);
           if (segmentRefreshInfo.getSegmentUpdatedTimestamp() < carbonIndexFiles[i]
               .getLastModifiedTime()) {
             segmentRefreshInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index e7bbea0..0f11bb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -129,22 +130,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     Collections.sort(queryModel.getTableBlockInfos());
 
     List<AbstractIndex> indexList = new ArrayList<>();
-    Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
-    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
-      List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
-      if (tableBlockInfos == null) {
-        tableBlockInfos = new ArrayList<>();
-        listMap.put(blockInfo.getFilePath(), tableBlockInfos);
-      }
-      BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
-      // This is the case of old stores where blocklet information is not available so read
-      // the blocklet information from block file
-      if (blockletDetailInfo.getBlockletInfo() == null) {
-        readAndFillBlockletInfo(blockInfo, tableBlockInfos, blockletDetailInfo);
-      } else {
-        tableBlockInfos.add(blockInfo);
-      }
-    }
+    Map<String, List<TableBlockInfo>> listMap = getFilePathToTableBlockInfoMapping(queryModel);
     for (List<TableBlockInfo> tableBlockInfos : listMap.values()) {
       indexList.add(new IndexWrapper(tableBlockInfos));
     }
@@ -194,38 +180,101 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
   }
 
   /**
+   * Method to prepare file path to table block Info mapping
+   *
+   * @param queryModel
+   * @return
+   * @throws IOException
+   */
+  private Map<String, List<TableBlockInfo>> getFilePathToTableBlockInfoMapping(
+      QueryModel queryModel) throws IOException {
+    Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+    // thsi is introduced to handle the case when CACHE_LEVEL=BLOCK and there are few other dataMaps
+    // like lucene, Bloom created on the table. In that case all the dataMaps will do blocklet
+    // level pruning and blockInfo entries will be repeated with different blockletIds
+    Map<String, DataFileFooter> filePathToFileFooterMapping = new HashMap<>();
+    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+      List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+      if (tableBlockInfos == null) {
+        tableBlockInfos = new ArrayList<>();
+        listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+      }
+      BlockletDetailInfo blockletDetailInfo = blockInfo.getDetailInfo();
+      // This case can come in 2 scenarios:
+      // 1. old stores (1.1 or any prior version to 1.1) where blocklet information is not
+      // available so read the blocklet information from block file
+      // 2. CACHE_LEVEL is set to block
+      if (blockletDetailInfo.getBlockletInfo() == null) {
+        readAndFillBlockletInfo(filePathToFileFooterMapping, tableBlockInfos, blockInfo,
+            blockletDetailInfo);
+      } else {
+        tableBlockInfos.add(blockInfo);
+      }
+    }
+    return listMap;
+  }
+
+  /**
    * Read the file footer of block file and get the blocklets to query
    */
-  private void readAndFillBlockletInfo(TableBlockInfo blockInfo,
-      List<TableBlockInfo> tableBlockInfos, BlockletDetailInfo blockletDetailInfo)
-      throws IOException {
+  private void readAndFillBlockletInfo(Map<String, DataFileFooter> filePathToFileFooterMapping,
+      List<TableBlockInfo> tableBlockInfos, TableBlockInfo blockInfo,
+      BlockletDetailInfo blockletDetailInfo) throws IOException {
     blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
-    blockInfo.setDetailInfo(null);
-    DataFileFooter fileFooter = CarbonUtil.readMetadatFile(blockInfo);
-    blockInfo.setDetailInfo(blockletDetailInfo);
+    DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
+    if (null == fileFooter) {
+      blockInfo.setDetailInfo(null);
+      fileFooter = CarbonUtil.readMetadatFile(blockInfo);
+      filePathToFileFooterMapping.put(blockInfo.getFilePath(), fileFooter);
+      blockInfo.setDetailInfo(blockletDetailInfo);
+    }
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
-    short count = 0;
-    for (BlockletInfo blockletInfo: blockletList) {
-      TableBlockInfo info = blockInfo.copy();
-      BlockletDetailInfo detailInfo = info.getDetailInfo();
-      detailInfo.setRowCount(blockletInfo.getNumberOfRows());
+    // cases when blockletID will be -1
+    // 1. In case of legacy store
+    // 2. In case CACHE_LEVEL is block and no other dataMap apart from blockletDataMap is
+    // created for a table
+    // In all above cases entries will be according to the number of blocks and not according to
+    // number of blocklets
+    if (blockletDetailInfo.getBlockletId() != -1) {
+      // fill the info only for given blockletId in detailInfo
+      BlockletInfo blockletInfo = blockletList.get(blockletDetailInfo.getBlockletId());
+      fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, blockletDetailInfo, fileFooter,
+          blockletInfo, blockletDetailInfo.getBlockletId());
+    } else {
+      short count = 0;
+      for (BlockletInfo blockletInfo : blockletList) {
+        fillBlockletInfoToTableBlock(tableBlockInfos, blockInfo, blockletDetailInfo, fileFooter,
+            blockletInfo, count);
+        count++;
+      }
+    }
+  }
+
+  private void fillBlockletInfoToTableBlock(List<TableBlockInfo> tableBlockInfos,
+      TableBlockInfo blockInfo, BlockletDetailInfo blockletDetailInfo, DataFileFooter fileFooter,
+      BlockletInfo blockletInfo, short blockletId) {
+    TableBlockInfo info = blockInfo.copy();
+    BlockletDetailInfo detailInfo = info.getDetailInfo();
+    detailInfo.setRowCount(blockletInfo.getNumberOfRows());
+    byte[][] maxValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues();
+    byte[][] minValues = blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues();
+    if (blockletDetailInfo.isLegacyStore()) {
       // update min and max values in case of old store for measures as min and max is written
-      // opposite for measures in old store
-      byte[][] maxValues = CarbonUtil.updateMinMaxValues(fileFooter,
+      // opposite for measures in old store ( store <= 1.1 version)
+      maxValues = CarbonUtil.updateMinMaxValues(fileFooter,
           blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues(),
           blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues(), false);
-      byte[][] minValues = CarbonUtil.updateMinMaxValues(fileFooter,
+      minValues = CarbonUtil.updateMinMaxValues(fileFooter,
           blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues(),
           blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues(), true);
-      blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
-      blockletInfo.getBlockletIndex().getMinMaxIndex().setMinValues(minValues);
-      detailInfo.setBlockletInfo(blockletInfo);
-      detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages());
-      detailInfo.setBlockletId(count);
       info.setDataBlockFromOldStore(true);
-      tableBlockInfos.add(info);
-      count++;
     }
+    blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues);
+    blockletInfo.getBlockletIndex().getMinMaxIndex().setMinValues(minValues);
+    detailInfo.setBlockletInfo(blockletInfo);
+    detailInfo.setBlockletId(blockletId);
+    detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages());
+    tableBlockInfos.add(info);
   }
 
   private List<TableBlockUniqueIdentifier> prepareTableBlockUniqueIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 518cd03..d3857f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.core.util;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,6 +30,7 @@ import java.util.TreeMap;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -37,7 +40,10 @@ import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrap
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -207,6 +213,22 @@ public class BlockletDataMapUtil {
     return tableBlockIndexUniqueIdentifiers;
   }
 
+  /**
+   * Method to check if CACHE_LEVEL is set to BLOCK or BLOCKLET
+   *
+   * @param carbonTable
+   * @param cacheLevelBlocklet
+   * @return
+   */
+  public static boolean isCacheLevelBlock(CarbonTable carbonTable, String cacheLevelBlocklet) {
+    String cacheLevel = carbonTable.getTableInfo().getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.CACHE_LEVEL);
+    if (!cacheLevelBlocklet.equals(cacheLevel)) {
+      return true;
+    }
+    return false;
+  }
+
   private static boolean isSameColumnSchemaList(List<ColumnSchema> indexFileColumnList,
       List<ColumnSchema> tableColumnList) {
     if (indexFileColumnList.size() != tableColumnList.size()) {
@@ -221,4 +243,82 @@ public class BlockletDataMapUtil {
     }
     return true;
   }
+
+  /**
+   * Fill the measures min values with minimum , this is needed for backward version compatability
+   * as older versions don't store min values for measures
+   */
+  public static byte[][] updateMinValues(SegmentProperties segmentProperties, byte[][] minValues) {
+    byte[][] updatedValues = minValues;
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    if (minValues.length < minMaxLen.length) {
+      updatedValues = new byte[minMaxLen.length][];
+      System.arraycopy(minValues, 0, updatedValues, 0, minValues.length);
+      List<CarbonMeasure> measures = segmentProperties.getMeasures();
+      ByteBuffer buffer = ByteBuffer.allocate(8);
+      for (int i = 0; i < measures.size(); i++) {
+        buffer.rewind();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        } else if (DataTypes.isDecimal(dataType)) {
+          updatedValues[minValues.length + i] =
+              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
+        } else {
+          buffer.putDouble(Double.MIN_VALUE);
+          updatedValues[minValues.length + i] = buffer.array().clone();
+        }
+      }
+    }
+    return updatedValues;
+  }
+
+  /**
+   * Fill the measures max values with maximum , this is needed for backward version compatability
+   * as older versions don't store max values for measures
+   */
+  public static byte[][] updateMaxValues(SegmentProperties segmentProperties, byte[][] maxValues) {
+    byte[][] updatedValues = maxValues;
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    if (maxValues.length < minMaxLen.length) {
+      updatedValues = new byte[minMaxLen.length][];
+      System.arraycopy(maxValues, 0, updatedValues, 0, maxValues.length);
+      List<CarbonMeasure> measures = segmentProperties.getMeasures();
+      ByteBuffer buffer = ByteBuffer.allocate(8);
+      for (int i = 0; i < measures.size(); i++) {
+        buffer.rewind();
+        DataType dataType = measures.get(i).getDataType();
+        if (dataType == DataTypes.BYTE) {
+          buffer.putLong(Byte.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.SHORT) {
+          buffer.putLong(Short.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.INT) {
+          buffer.putLong(Integer.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (dataType == DataTypes.LONG) {
+          buffer.putLong(Long.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        } else if (DataTypes.isDecimal(dataType)) {
+          updatedValues[maxValues.length + i] =
+              DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
+        } else {
+          buffer.putDouble(Double.MAX_VALUE);
+          updatedValues[maxValues.length + i] = buffer.array().clone();
+        }
+      }
+    }
+    return updatedValues;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
index fa142aa..85de7c4 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMap.java
@@ -42,8 +42,8 @@ public class TestBlockletDataMap extends AbstractDictionaryCacheTest {
       }
     };
 
-    BlockletDataMap blockletDataMap = new BlockletDataMap();
-    Method method = BlockletDataMap.class
+    BlockDataMap blockletDataMap = new BlockletDataMap();
+    Method method = BlockDataMap.class
         .getDeclaredMethod("addBlockBasedOnMinMaxValue", FilterExecuter.class, byte[][].class,
             byte[][].class, String.class, int.class);
     method.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/61187115/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index 526f630..fa7bf08 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -89,7 +89,7 @@ public class TestBlockletDataMapFactory {
   @Test public void addDataMapToCache()
       throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException,
       IllegalAccessException {
-    List<BlockletDataMap> dataMaps = new ArrayList<>();
+    List<BlockDataMap> dataMaps = new ArrayList<>();
     Method method = BlockletDataMapFactory.class
         .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifierWrapper.class,
             BlockletDataMapIndexWrapper.class);


Mime
View raw message