carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper
Date Mon, 07 May 2018 08:56:12 GMT
[CARBONDATA-2310] Refactored code to improve Distributable interface & [CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

This PR has two JIRA fixes
[CARBONDATA-2310] Refactored code to improve Distributable interface
[CARBONDATA-2362] Changing the Cacheable object from DataMap to Wrapper

This closes #2244


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/531ecdf3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/531ecdf3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/531ecdf3

Branch: refs/heads/master
Commit: 531ecdf3f40c064d4ff6ad36c43fa90a2d423588
Parents: a7926ea
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Fri Apr 27 23:03:52 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon May 7 13:11:29 2018 +0530

----------------------------------------------------------------------
 .../org/apache/carbondata/core/cache/Cache.java |  10 +
 .../dictionary/AbstractDictionaryCache.java     |   6 +
 .../core/constants/CarbonCommonConstants.java   |   3 +
 .../core/datamap/dev/CacheableDataMap.java      |  51 ++++
 .../core/datastore/SegmentTaskIndexStore.java   |   7 +
 .../filesystem/AbstractDFSCarbonFile.java       |   5 +-
 .../core/datastore/filesystem/CarbonFile.java   |   3 +-
 .../datastore/filesystem/LocalCarbonFile.java   |   3 +-
 .../core/indexstore/AbstractMemoryDMStore.java  |  63 +++++
 .../indexstore/BlockletDataMapIndexStore.java   | 187 ++++++++-------
 .../indexstore/BlockletDataMapIndexWrapper.java |  52 +++++
 .../core/indexstore/BlockletDetailInfo.java     |  66 ++++--
 .../core/indexstore/SafeMemoryDMStore.java      | 105 +++++++++
 .../TableBlockIndexUniqueIdentifier.java        |   5 +-
 .../core/indexstore/UnsafeMemoryDMStore.java    |  25 +-
 .../blockletindex/BlockletDataMap.java          | 232 ++++++++++++-------
 .../BlockletDataMapDistributable.java           |  12 +
 .../blockletindex/BlockletDataMapFactory.java   | 127 ++++++----
 .../blockletindex/BlockletDataMapModel.java     |  12 +
 .../blockletindex/SegmentIndexFileStore.java    |  39 +++-
 .../core/indexstore/row/DataMapRow.java         |  13 +-
 .../core/indexstore/row/UnsafeDataMapRow.java   |   7 +-
 .../core/indexstore/schema/CarbonRowSchema.java |  10 +-
 .../core/metadata/SegmentFileStore.java         |  29 +++
 .../core/metadata/schema/table/TableInfo.java   |  24 ++
 .../TableStatusReadCommittedScope.java          |   4 +-
 .../core/util/BlockletDataMapUtil.java          | 180 ++++++++++++++
 .../carbondata/core/util/SessionParams.java     |   5 +
 .../core/util/path/CarbonTablePath.java         |   2 +-
 .../TestBlockletDataMapFactory.java             | 126 ++++++++++
 .../apache/carbondata/hadoop/CacheClient.java   |  49 ++++
 .../hadoop/api/AbstractDataMapJob.java          |  42 ++++
 .../hadoop/api/CarbonFileInputFormat.java       |   2 +-
 .../hadoop/api/CarbonInputFormat.java           |  27 ++-
 .../hadoop/api/CarbonTableInputFormat.java      |   2 +-
 .../carbondata/hadoop/api/DataMapJob.java       |   6 +
 .../hadoop/util/CarbonInputFormatUtil.java      |  43 +++-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |   1 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  15 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   4 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   3 +
 .../execution/command/CarbonHiveCommands.scala  |   9 +
 42 files changed, 1335 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
index 04fa18a..6df36fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/Cache.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.cache;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.core.memory.MemoryException;
+
 /**
  * A semi-persistent mapping from keys to values. Cache entries are manually added using
  * #get(Key), #getAll(List<Keys>) , and are stored in the cache until
@@ -69,6 +71,14 @@ public interface Cache<K, V> {
   void invalidate(K key);
 
   /**
+   * This method will add the value to the cache for the given key
+   *
+   * @param key
+   * @param value
+   */
+  void put(K key, V value) throws IOException, MemoryException;
+
+  /**
    * Access count of Cacheable entry will be decremented
    *
    * @param keys

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index fb67208..83c7237 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -59,6 +59,12 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
     initThreadPoolSize();
   }
 
+  @Override
+  public void put(DictionaryColumnUniqueIdentifier key, Dictionary value) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+
   /**
    * This method will initialize the thread pool size to be used for creating the
    * max number of threads for a job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f9bf220..56607b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1726,6 +1726,9 @@ public final class CarbonCommonConstants {
    */
   public static final String INDEX_COLUMNS = "INDEX_COLUMNS";
 
+  // Property to enable parallel datamap loading for a table
+  public static final String CARBON_LOAD_DATAMAPS_PARALLEL = "carbon.load.datamaps.parallel.";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
new file mode 100644
index 0000000..dba0840
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/CacheableDataMap.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * Interface for data map caching
+ */
+public interface CacheableDataMap {
+
+  /**
+   * Add the blockletDataMapIndexWrapper to cache for key tableBlockIndexUniqueIdentifier
+   *
+   * @param tableBlockIndexUniqueIdentifier
+   * @param blockletDataMapIndexWrapper
+   */
+  void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException;
+
+  /**
+   * Get all the uncached distributables from the list.
+   *
+   * @param distributables
+   * @return
+   */
+  List<DataMapDistributable> getAllUncachedDistributables(List<DataMapDistributable> distributables)
+      throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 537c635..d325f21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.datastore.block.SegmentTaskIndex;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -142,6 +143,12 @@ public class SegmentTaskIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override
+  public void put(TableSegmentUniqueIdentifier key, SegmentTaskIndexWrapper value)
+      throws IOException, MemoryException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   /**
    * returns block timestamp value from the given task
    * @param taskKey

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 0419405..7255237 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -526,7 +527,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
   }
 
   @Override
-  public CarbonFile[] locationAwareListFiles() throws IOException {
+  public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
     if (null != fileStatus && fileStatus.isDirectory()) {
       List<FileStatus> listStatus = new ArrayList<>();
       Path path = fileStatus.getPath();
@@ -534,7 +535,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
           path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
       while (iter.hasNext()) {
         LocatedFileStatus fileStatus = iter.next();
-        if (fileStatus.getLen() > 0) {
+        if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
           listStatus.add(fileStatus);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index eb65dfd..a104137 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 public interface CarbonFile {
@@ -41,7 +42,7 @@ public interface CarbonFile {
    * It returns list of files with location details.
    * @return
    */
-  CarbonFile[] locationAwareListFiles() throws IOException;
+  CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException;
 
   String getName();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index d28e85e..60b7e17 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -49,6 +49,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
@@ -448,7 +449,7 @@ public class LocalCarbonFile implements CarbonFile {
     return file.createNewFile();
   }
 
-  @Override public CarbonFile[] locationAwareListFiles() throws IOException {
+  @Override public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
     return listFiles();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
new file mode 100644
index 0000000..e6bc691
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+
+/**
+ * Store the data map row @{@link DataMapRow}
+ */
+public abstract class AbstractMemoryDMStore implements Serializable {
+
+  protected boolean isMemoryFreed;
+
+  protected CarbonRowSchema[] schema;
+
+  protected final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
+  public AbstractMemoryDMStore(CarbonRowSchema[] schema) {
+    this.schema = schema;
+  }
+
+  public abstract void addIndexRow(DataMapRow indexRow) throws MemoryException;
+
+  public abstract DataMapRow getDataMapRow(int index);
+
+  public abstract void freeMemory();
+
+  public abstract int getMemoryUsed();
+
+  public CarbonRowSchema[] getSchema() {
+    return schema;
+  }
+
+  public abstract int getRowCount();
+
+  public void finishWriting() throws MemoryException {
+    // do nothing in default implementation
+  }
+
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    throw new UnsupportedOperationException("Operation not allowed");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 167a04e..ba4193e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,26 +29,19 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
  * blocks
  */
 public class BlockletDataMapIndexStore
-    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMap> {
+    implements Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMapIndexStore.class.getName());
   /**
@@ -76,106 +68,93 @@ public class BlockletDataMapIndexStore
   }
 
   @Override
-  public BlockletDataMap get(TableBlockIndexUniqueIdentifier identifier)
+  public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifier identifier)
       throws IOException {
     String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
-    BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
-    if (dataMap == null) {
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper =
+        (BlockletDataMapIndexWrapper) lruCache.get(lruCacheKey);
+    List<BlockletDataMap> dataMaps = new ArrayList<>();
+    if (blockletDataMapIndexWrapper == null) {
       try {
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         Set<String> filesRead = new HashSet<>();
-        Map<String, BlockMetaInfo> blockMetaInfoMap =
-            getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-        dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
-      } catch (MemoryException e) {
+        long memorySize = 0L;
+        String segmentFilePath = identifier.getIndexFilePath();
+        Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil
+            .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+        // if the identifier is not a merge file we can directly load the datamaps
+        if (identifier.getMergeIndexFileName() == null) {
+          Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+              .getBlockMetaInfoMap(identifier, indexFileStore, filesRead,
+                  carbonDataFileBlockMetaInfoMapping);
+          BlockletDataMap blockletDataMap =
+              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
+          memorySize += blockletDataMap.getMemorySize();
+          dataMaps.add(blockletDataMap);
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        } else {
+          // if the identifier is a merge file then collect the index files and load the datamaps
+          List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+              BlockletDataMapUtil.getIndexFileIdentifiersFromMergeFile(identifier, indexFileStore);
+          for (TableBlockIndexUniqueIdentifier blockIndexUniqueIdentifier :
+              tableBlockIndexUniqueIdentifiers) {
+            Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
+                .getBlockMetaInfoMap(blockIndexUniqueIdentifier, indexFileStore, filesRead,
+                    carbonDataFileBlockMetaInfoMapping);
+            BlockletDataMap blockletDataMap =
+                loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap);
+            memorySize += blockletDataMap.getMemorySize();
+            dataMaps.add(blockletDataMap);
+          }
+          blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps);
+        }
+        lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+            memorySize);
+      } catch (Throwable e) {
+        // clear all the memory used by datamaps loaded
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
       }
     }
-    return dataMap;
-  }
-
-  private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
-      SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
-    if (identifier.getMergeIndexFileName() != null) {
-      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getMergeIndexFileName());
-      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
-        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
-        filesRead.add(indexMergeFile.getPath());
-      }
-    }
-    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
-      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
-          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getIndexFileName()) });
-    }
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
-    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
-        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
-    for (DataFileFooter footer : indexInfo) {
-      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (FileFactory.isFileExist(blockPath)) {
-        blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
-      } else {
-        LOGGER.warn("Skipping invalid block " + footer.getBlockInfo().getBlockUniqueName()
-            + " The block does not exist. The block might be got deleted due to clean up post"
-            + " update/delete operation over table.");
-      }
-    }
-    return blockMetaInfoMap;
-  }
-
-  private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
-    if (carbonFile instanceof AbstractDFSCarbonFile) {
-      RemoteIterator<LocatedFileStatus> iter =
-          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
-      LocatedFileStatus fileStatus = iter.next();
-      String[] location = fileStatus.getBlockLocations()[0].getHosts();
-      long len = fileStatus.getLen();
-      return new BlockMetaInfo(location, len);
-    } else {
-      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
-    }
+    return blockletDataMapIndexWrapper;
   }
 
   @Override
-  public List<BlockletDataMap> getAll(
+  public List<BlockletDataMapIndexWrapper> getAll(
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
-    List<BlockletDataMap> blockletDataMaps =
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
         new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifier> missedIdentifiers = new ArrayList<>();
+    BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = null;
     // Get the datamaps for each indexfile from cache.
     try {
       for (TableBlockIndexUniqueIdentifier identifier : tableSegmentUniqueIdentifiers) {
-        BlockletDataMap ifPresent = getIfPresent(identifier);
-        if (ifPresent != null) {
-          blockletDataMaps.add(ifPresent);
+        BlockletDataMapIndexWrapper dataMapIndexWrapper = getIfPresent(identifier);
+        if (dataMapIndexWrapper != null) {
+          blockletDataMapIndexWrappers.add(dataMapIndexWrapper);
         } else {
           missedIdentifiers.add(identifier);
         }
       }
       if (missedIdentifiers.size() > 0) {
-        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        Set<String> filesRead = new HashSet<>();
-        for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
-          Map<String, BlockMetaInfo> blockMetaInfoMap =
-              getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
-          blockletDataMaps.add(
-              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
+        for (TableBlockIndexUniqueIdentifier identifier : missedIdentifiers) {
+          blockletDataMapIndexWrapper = get(identifier);
+          blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
         }
       }
     } catch (Throwable e) {
-      for (BlockletDataMap dataMap : blockletDataMaps) {
-        dataMap.clear();
+      if (null != blockletDataMapIndexWrapper) {
+        List<BlockletDataMap> dataMaps = blockletDataMapIndexWrapper.getDataMaps();
+        for (DataMap dataMap : dataMaps) {
+          dataMap.clear();
+        }
       }
       throw new IOException("Problem in loading segment blocks.", e);
     }
-    return blockletDataMaps;
+    return blockletDataMapIndexWrappers;
   }
 
   /**
@@ -185,9 +164,9 @@ public class BlockletDataMapIndexStore
    * @return
    */
   @Override
-  public BlockletDataMap getIfPresent(
+  public BlockletDataMapIndexWrapper getIfPresent(
       TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) {
-    return (BlockletDataMap) lruCache.get(
+    return (BlockletDataMapIndexWrapper) lruCache.get(
         tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
@@ -201,6 +180,44 @@ public class BlockletDataMapIndexStore
     lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
   }
 
+  @Override
+  public void put(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper wrapper) throws IOException, MemoryException {
+    String uniqueTableSegmentIdentifier =
+        tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier();
+    Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier);
+    if (lock == null) {
+      lock = addAndGetSegmentLock(uniqueTableSegmentIdentifier);
+    }
+    long memorySize = 0L;
+    // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+    // as in that case clearing unsafe memory need to be taken card. If at all datamap entry
+    // in the cache need to be overwritten then use the invalidate interface
+    // and then use the put interface
+    if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+      synchronized (lock) {
+        if (null == getIfPresent(tableBlockIndexUniqueIdentifier)) {
+          List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+          try {
+            for (BlockletDataMap blockletDataMap: dataMaps) {
+              blockletDataMap.convertToUnsafeDMStore();
+              memorySize += blockletDataMap.getMemorySize();
+            }
+            lruCache.put(tableBlockIndexUniqueIdentifier.getUniqueTableSegmentIdentifier(), wrapper,
+                memorySize);
+          } catch (Throwable e) {
+            // clear all the memory acquired by data map in case of any failure
+            for (DataMap blockletDataMap : dataMaps) {
+              blockletDataMap.clear();
+            }
+            throw new IOException("Problem in adding datamap to cache.", e);
+          }
+        }
+      }
+    }
+  }
+
+
   /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
@@ -228,8 +245,6 @@ public class BlockletDataMapIndexStore
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
           blockMetaInfoMap, identifier.getSegmentId()));
-      lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
-          dataMap.getMemorySize());
     }
     return dataMap;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
new file mode 100644
index 0000000..d674cb4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
+
+/**
+ * A cacheable wrapper of datamaps
+ */
+public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+
+  private List<BlockletDataMap> dataMaps;
+
+  public BlockletDataMapIndexWrapper(List<BlockletDataMap> dataMaps) {
+    this.dataMaps = dataMaps;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  @Override public int getAccessCount() {
+    return 0;
+  }
+
+  @Override public long getMemorySize() {
+    return 0;
+  }
+
+  public List<BlockletDataMap> getDataMaps() {
+    return dataMaps;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 660f4c1..8bae7fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -22,20 +22,29 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 import org.apache.hadoop.io.Writable;
-import org.xerial.snappy.Snappy;
 
 /**
  * Blocklet detail information to be sent to each executor
  */
 public class BlockletDetailInfo implements Serializable, Writable {
 
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BlockletDetailInfo.class.getName());
+
+  private static final long serialVersionUID = 7957493757421513808L;
+
   private int rowCount;
 
   private short pagesCount;
@@ -50,6 +59,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
 
   private BlockletInfo blockletInfo;
 
+  private byte[] blockletInfoBinary;
+
   private long blockFooterOffset;
 
   private List<ColumnSchema> columnSchemas;
@@ -83,6 +94,13 @@ public class BlockletDetailInfo implements Serializable, Writable {
   }
 
   public BlockletInfo getBlockletInfo() {
+    if (null == blockletInfo) {
+      try {
+        setBlockletInfoFromBinary();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
     return blockletInfo;
   }
 
@@ -90,6 +108,26 @@ public class BlockletDetailInfo implements Serializable, Writable {
     this.blockletInfo = blockletInfo;
   }
 
+  private void setBlockletInfoFromBinary() throws IOException {
+    if (null == this.blockletInfo && null != blockletInfoBinary && blockletInfoBinary.length > 0) {
+      blockletInfo = new BlockletInfo();
+      ByteArrayInputStream stream = new ByteArrayInputStream(blockletInfoBinary);
+      DataInputStream inputStream = new DataInputStream(stream);
+      try {
+        blockletInfo.readFields(inputStream);
+      } catch (IOException e) {
+        LOGGER.error("Problem in reading blocklet info");
+        throw new IOException("Problem in reading blocklet info." + e.getMessage());
+      } finally {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOGGER.error(e, "Problem in closing input stream of reading blocklet info.");
+        }
+      }
+    }
+  }
+
   public int[] getDimLens() {
     return dimLens;
   }
@@ -131,6 +169,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
     out.writeLong(blockFooterOffset);
     out.writeInt(columnSchemaBinary.length);
     out.write(columnSchemaBinary);
+    out.writeInt(blockletInfoBinary.length);
+    out.write(blockletInfoBinary);
     out.writeLong(blockSize);
   }
 
@@ -153,6 +193,10 @@ public class BlockletDetailInfo implements Serializable, Writable {
     byte[] schemaArray = new byte[bytesSize];
     in.readFully(schemaArray);
     readColumnSchema(schemaArray);
+    int byteSize = in.readInt();
+    blockletInfoBinary = new byte[byteSize];
+    in.readFully(blockletInfoBinary);
+    setBlockletInfoFromBinary();
     blockSize = in.readLong();
   }
 
@@ -162,17 +206,8 @@ public class BlockletDetailInfo implements Serializable, Writable {
    * @throws IOException
    */
   public void readColumnSchema(byte[] schemaArray) throws IOException {
-    // uncompress it.
-    schemaArray = Snappy.uncompress(schemaArray);
-    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
-    DataInput schemaInput = new DataInputStream(schemaStream);
-    columnSchemas = new ArrayList<>();
-    int size = schemaInput.readShort();
-    for (int i = 0; i < size; i++) {
-      ColumnSchema columnSchema = new ColumnSchema();
-      columnSchema.readFields(schemaInput);
-      columnSchemas.add(columnSchema);
-    }
+    BlockletDataMap blockletDataMap = new BlockletDataMap();
+    columnSchemas = blockletDataMap.readColumnSchema(schemaArray);
   }
 
   /**
@@ -223,4 +258,9 @@ public class BlockletDetailInfo implements Serializable, Writable {
   public byte[] getColumnSchemaBinary() {
     return columnSchemaBinary;
   }
+
+  public void setBlockletInfoBinary(byte[] blockletInfoBinary) {
+    this.blockletInfoBinary = blockletInfoBinary;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
new file mode 100644
index 0000000..d7a1b8f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.indexstore.row.DataMapRow;
+import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Store the data map row @{@link DataMapRow} data to memory.
+ */
+public class SafeMemoryDMStore extends AbstractMemoryDMStore {
+
+  /**
+   * holds all blocklets metadata in memory
+   */
+  private List<DataMapRow> dataMapRows =
+      new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private int runningLength;
+
+  public SafeMemoryDMStore(CarbonRowSchema[] schema) {
+    super(schema);
+  }
+
+  /**
+   * Add the index row to dataMapRows, basically to in memory.
+   *
+   * @param indexRow
+   * @return
+   */
+  @Override
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
+    dataMapRows.add(indexRow);
+    runningLength += indexRow.getTotalSizeInBytes();
+  }
+
+  @Override
+  public DataMapRow getDataMapRow(int index) {
+    assert (index < dataMapRows.size());
+    return dataMapRows.get(index);
+  }
+
+  @Override
+  public void freeMemory() {
+    if (!isMemoryFreed) {
+      if (null != dataMapRows) {
+        dataMapRows.clear();
+        dataMapRows = null;
+      }
+      isMemoryFreed = true;
+    }
+  }
+
+  @Override
+  public int getMemoryUsed() {
+    return runningLength;
+  }
+
+  @Override
+  public int getRowCount() {
+    return dataMapRows.size();
+  }
+
+  @Override
+  public UnsafeMemoryDMStore convertToUnsafeDMStore() throws MemoryException {
+    setSchemaDataType();
+    UnsafeMemoryDMStore unsafeMemoryDMStore = new UnsafeMemoryDMStore(schema);
+    for (DataMapRow dataMapRow : dataMapRows) {
+      unsafeMemoryDMStore.addIndexRow(dataMapRow);
+    }
+    unsafeMemoryDMStore.finishWriting();
+    return unsafeMemoryDMStore;
+  }
+
+  /**
+   * Set the dataType to the schema. Needed in case of serialization / deserialization
+   */
+  private void setSchemaDataType() {
+    for (CarbonRowSchema carbonRowSchema : schema) {
+      carbonRowSchema.setDataType(DataTypeUtil.valueOf(carbonRowSchema.getDataType(), 0, 0));
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index c907fa8..3226ceb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.indexstore;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -24,7 +25,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 /**
  * Class holds the indexFile information to uniquely identitify the carbon index
  */
-public class TableBlockIndexUniqueIdentifier {
+public class TableBlockIndexUniqueIdentifier implements Serializable {
+
+  private static final long serialVersionUID = 5808112137916196344L;
 
   private String indexFilePath;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 31ecac2..ca5e2dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -32,9 +31,11 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
 /**
  * Store the data map row @{@link DataMapRow} data to unsafe.
  */
-public class UnsafeMemoryDMStore {
+public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
-  private MemoryBlock memoryBlock;
+  private static final long serialVersionUID = -5344592407101055335L;
+
+  private transient MemoryBlock memoryBlock;
 
   private static int capacity = 8 * 1024;
 
@@ -42,18 +43,12 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private boolean isMemoryFreed;
-
-  private CarbonRowSchema[] schema;
-
   private int[] pointers;
 
   private int rowCount;
 
-  private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
-
   public UnsafeMemoryDMStore(CarbonRowSchema[] schema) throws MemoryException {
-    this.schema = schema;
+    super(schema);
     this.allocatedSize = capacity;
     this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize);
     this.pointers = new int[1000];
@@ -92,7 +87,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
+  public void addIndexRow(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -172,7 +167,7 @@ public class UnsafeMemoryDMStore {
     }
   }
 
-  public UnsafeDataMapRow getUnsafeRow(int index) {
+  public DataMapRow getDataMapRow(int index) {
     assert (index < rowCount);
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
@@ -205,12 +200,8 @@ public class UnsafeMemoryDMStore {
     return runningLength;
   }
 
-  public CarbonRowSchema[] getSchema() {
-    return schema;
-  }
-
   public int getRowCount() {
     return rowCount;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index f72dc06..3ff9cdc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -18,10 +18,12 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -32,18 +34,19 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -76,11 +79,13 @@ import org.xerial.snappy.Snappy;
 /**
  * Datamap implementation for blocklet.
  */
-public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
+public class BlockletDataMap extends CoarseGrainDataMap implements Serializable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BlockletDataMap.class.getName());
 
+  private static final long serialVersionUID = -2170289352240810993L;
+
   private static int KEY_INDEX = 0;
 
   private static int MIN_VALUES_INDEX = 1;
@@ -119,14 +124,17 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   private static int SEGMENTID = 5;
 
-  private UnsafeMemoryDMStore unsafeMemoryDMStore;
+  private AbstractMemoryDMStore memoryDMStore;
 
-  private UnsafeMemoryDMStore unsafeMemorySummaryDMStore;
+  private AbstractMemoryDMStore summaryDMStore;
 
-  private SegmentProperties segmentProperties;
+  // As it is a heavy object it is not recommended to serialize this object
+  private transient SegmentProperties segmentProperties;
 
   private int[] columnCardinality;
 
+  private long blockletSchemaTime;
+
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
@@ -150,11 +158,12 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       if (segmentProperties == null) {
         List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
         schemaBinary = convertSchemaToBinary(columnInTable);
+        blockletSchemaTime = fileFooter.getSchemaUpdatedTimeStamp();
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
-        createSchema(segmentProperties);
+        createSchema(segmentProperties, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
         createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
-            segmentId);
+            segmentId, ((BlockletDataMapModel) dataMapModel).isAddToUnsafe());
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -185,21 +194,23 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         }
       }
     }
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.finishWriting();
+    if (memoryDMStore != null) {
+      memoryDMStore.finishWriting();
     }
-    if (null != unsafeMemorySummaryDMStore) {
+    if (null != summaryDMStore) {
       addTaskSummaryRowToUnsafeMemoryStore(
           summaryRow,
           schemaBinary,
           filePath,
           fileName,
           segmentId);
-      unsafeMemorySummaryDMStore.finishWriting();
+      summaryDMStore.finishWriting();
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + " is "
+              + (System.currentTimeMillis() - startTime));
     }
-    LOGGER.info(
-        "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
-            System.currentTimeMillis() - startTime));
   }
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
@@ -207,10 +218,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (!blockletList.isEmpty() && summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
@@ -226,7 +237,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
       // compute and set task level min values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], minValues,
           TASK_MIN_VALUES_INDEX, true);
       ordinal++;
       taskMinMaxOrdinal++;
@@ -234,7 +245,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
       // compute and set task level max values
       addTaskMinMaxValues(summaryRow, minMaxLen,
-          unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+          summaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
           TASK_MAX_VALUES_INDEX, false);
       ordinal++;
 
@@ -269,7 +280,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         row.setShort((short) relativeBlockletId++, ordinal++);
         // Store block size
         row.setLong(blockMetaInfo.getSize(), ordinal);
-        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+        memoryDMStore.addIndexRow(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -295,10 +306,10 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       BlockMetaInfo blockMetaInfo) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
-    CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    CarbonRowSchema[] schema = memoryDMStore.getSchema();
     // Add one row to maintain task level min max for segment pruning
     if (summaryRow == null) {
-      summaryRow = new DataMapRowImpl(unsafeMemorySummaryDMStore.getSchema());
+      summaryRow = new DataMapRowImpl(summaryDMStore.getSchema());
     }
     DataMapRow row = new DataMapRowImpl(schema);
     int ordinal = 0;
@@ -317,14 +328,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal);
     // compute and set task level min values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues,
         TASK_MIN_VALUES_INDEX, true);
     ordinal++;
     taskMinMaxOrdinal++;
     row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal);
     // compute and set task level max values
     addTaskMinMaxValues(summaryRow, minMaxLen,
-        unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
+        summaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues,
         TASK_MAX_VALUES_INDEX, false);
     ordinal++;
 
@@ -357,7 +368,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
       // store block size
       row.setLong(blockMetaInfo.getSize(), ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      memoryDMStore.addIndexRow(row);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -378,7 +389,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
       summaryRow.setByteArray(segmentId, SEGMENTID);
       try {
-        unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
+        summaryDMStore.addIndexRow(summaryRow);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -516,7 +527,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     taskMinMaxRow.setRow(row, ordinal);
   }
 
-  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
+  private void createSchema(SegmentProperties segmentProperties, boolean addToUnsafe)
+      throws MemoryException {
     List<CarbonRowSchema> indexSchemas = new ArrayList<>();
 
     // Index key
@@ -553,8 +565,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     // for storing block length.
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
 
-    unsafeMemoryDMStore =
-        new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
+    CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
+    memoryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   /**
@@ -565,7 +577,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
    * @throws MemoryException
    */
   private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
-      byte[] filePath, byte[] fileName, byte[] segmentId)
+      byte[] filePath, byte[] fileName, byte[] segmentId, boolean addToUnsafe)
       throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
@@ -581,8 +593,9 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     // for storing segmentid
     taskMinMaxSchemas.add(
         new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
-    unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
-        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+    CarbonRowSchema[] schema =
+        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]);
+    summaryDMStore = getMemoryDMStore(schema, addToUnsafe);
   }
 
   private void getMinMaxSchema(SegmentProperties segmentProperties,
@@ -611,8 +624,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     FilterExecuter filterExecuter =
         FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-    for (int i = 0; i < unsafeMemorySummaryDMStore.getRowCount(); i++) {
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(i);
+    for (int i = 0; i < summaryDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(i);
       boolean isScanRequired = FilterExpressionProcessor.isScanRequired(
           filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
           getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
@@ -624,26 +637,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   }
 
   private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     List<Blocklet> blocklets = new ArrayList<>();
     int numBlocklets = 0;
     if (filterExp == null) {
-      numBlocklets = unsafeMemoryDMStore.getRowCount();
+      numBlocklets = memoryDMStore.getRowCount();
       for (int i = 0; i < numBlocklets; i++) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(i).convertToSafeRow();
         blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)));
       }
     } else {
       // Remove B-tree jump logic as start and end key prepared is not
       // correct for old store scenarios
       int startIndex = 0;
-      numBlocklets = unsafeMemoryDMStore.getRowCount();
+      numBlocklets = memoryDMStore.getRowCount();
       FilterExecuter filterExecuter =
           FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
       while (startIndex < numBlocklets) {
-        DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+        DataMapRow safeRow = memoryDMStore.getDataMapRow(startIndex).convertToSafeRow();
         int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX);
         String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
             CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
@@ -663,7 +676,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions) {
-    if (unsafeMemoryDMStore.getRowCount() == 0) {
+    if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
     // if it has partitioned datamap but there is no partitioned information stored, it means
@@ -740,10 +753,26 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
     int index = Integer.parseInt(blockletId);
-    DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+    DataMapRow safeRow = memoryDMStore.getDataMapRow(index).convertToSafeRow();
     return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX));
   }
 
+  /**
+   * Get the index file name of the blocklet data map
+   *
+   * @return
+   */
+  public String getIndexFileName() {
+    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
+    try {
+      return new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+          CarbonCommonConstants.DEFAULT_CHARSET);
+    } catch (UnsupportedEncodingException e) {
+      // should never happen!
+      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
+    }
+  }
+
   private byte[][] getMinMaxValue(DataMapRow row, int index) {
     DataMapRow minMaxRow = row.getRow(index);
     byte[][] minMax = new byte[minMaxRow.getColumnCount()][];
@@ -764,23 +793,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     detailInfo.setBlockletId((short) blockletId);
     detailInfo.setDimLens(columnCardinality);
     detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
-    byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
-    BlockletInfo blockletInfo = null;
+    detailInfo.setBlockletInfoBinary(row.getByteArray(BLOCK_INFO_INDEX));
     try {
-      if (byteArray.length > 0) {
-        blockletInfo = new BlockletInfo();
-        ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
-        DataInputStream inputStream = new DataInputStream(stream);
-        blockletInfo.readFields(inputStream);
-        inputStream.close();
-      }
       blocklet.setLocation(
           new String(row.getByteArray(LOCATIONS), CarbonCommonConstants.DEFAULT_CHARSET)
               .split(","));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    detailInfo.setBlockletInfo(blockletInfo);
     blocklet.setDetailInfo(detailInfo);
     detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
     detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
@@ -791,7 +811,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private String[] getFileDetails() {
     try {
       String[] fileDetails = new String[3];
-      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+      DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
       fileDetails[0] =
           new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
       fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
@@ -815,14 +835,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private int findStartIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -831,7 +851,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
         // if key is matched then get the first entry
         int currentPos = mid;
         while (currentPos - 1 >= 0
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos - 1)) == 0) {
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos - 1)) == 0) {
           currentPos--;
         }
         mid = currentPos;
@@ -863,14 +883,14 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
   private int findEndIndex(DataMapRow key, Comparator<DataMapRow> comparator) {
     int childNodeIndex;
     int low = 0;
-    int high = unsafeMemoryDMStore.getRowCount() - 1;
+    int high = memoryDMStore.getRowCount() - 1;
     int mid = 0;
     int compareRes = -1;
     //
     while (low <= high) {
       mid = (low + high) >>> 1;
       // compare the entries
-      compareRes = comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(mid));
+      compareRes = comparator.compare(key, memoryDMStore.getDataMapRow(mid));
       if (compareRes < 0) {
         high = mid - 1;
       } else if (compareRes > 0) {
@@ -878,8 +898,8 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
       } else {
         int currentPos = mid;
         // if key is matched then get the first entry
-        while (currentPos + 1 < unsafeMemoryDMStore.getRowCount()
-            && comparator.compare(key, unsafeMemoryDMStore.getUnsafeRow(currentPos + 1)) == 0) {
+        while (currentPos + 1 < memoryDMStore.getRowCount()
+            && comparator.compare(key, memoryDMStore.getDataMapRow(currentPos + 1)) == 0) {
           currentPos++;
         }
         mid = currentPos;
@@ -907,13 +927,13 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     buffer.putInt(key.getNoDictionaryKeys().length);
     buffer.put(key.getDictionaryKeys());
     buffer.put(key.getNoDictionaryKeys());
-    DataMapRowImpl dataMapRow = new DataMapRowImpl(unsafeMemoryDMStore.getSchema());
+    DataMapRowImpl dataMapRow = new DataMapRowImpl(memoryDMStore.getSchema());
     dataMapRow.setByteArray(buffer.array(), 0);
     return dataMapRow;
   }
 
   private byte[] getColumnSchemaBinary() {
-    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+    DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
     return unsafeRow.getByteArray(SCHEMA);
   }
 
@@ -937,36 +957,25 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
 
   @Override
   public void clear() {
-    if (unsafeMemoryDMStore != null) {
-      unsafeMemoryDMStore.freeMemory();
-      unsafeMemoryDMStore = null;
+    if (memoryDMStore != null) {
+      memoryDMStore.freeMemory();
+      memoryDMStore = null;
       segmentProperties = null;
     }
     // clear task min/max unsafe memory
-    if (null != unsafeMemorySummaryDMStore) {
-      unsafeMemorySummaryDMStore.freeMemory();
-      unsafeMemorySummaryDMStore = null;
+    if (null != summaryDMStore) {
+      summaryDMStore.freeMemory();
+      summaryDMStore = null;
     }
   }
 
-  @Override
-  public long getFileTimeStamp() {
-    return 0;
-  }
-
-  @Override
-  public int getAccessCount() {
-    return 0;
-  }
-
-  @Override
   public long getMemorySize() {
     long memoryUsed = 0L;
-    if (unsafeMemoryDMStore != null) {
-      memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+    if (memoryDMStore != null) {
+      memoryUsed += memoryDMStore.getMemoryUsed();
     }
-    if (null != unsafeMemorySummaryDMStore) {
-      memoryUsed += unsafeMemorySummaryDMStore.getMemoryUsed();
+    if (null != summaryDMStore) {
+      memoryUsed += summaryDMStore.getMemoryUsed();
     }
     return memoryUsed;
   }
@@ -975,4 +984,65 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Cacheable {
     return segmentProperties;
   }
 
+  public void setSegmentProperties(SegmentProperties segmentProperties) {
+    this.segmentProperties = segmentProperties;
+  }
+
+  public int[] getColumnCardinality() {
+    return columnCardinality;
+  }
+
+  private AbstractMemoryDMStore getMemoryDMStore(CarbonRowSchema[] schema, boolean addToUnsafe)
+      throws MemoryException {
+    AbstractMemoryDMStore memoryDMStore;
+    if (addToUnsafe) {
+      memoryDMStore = new UnsafeMemoryDMStore(schema);
+    } else {
+      memoryDMStore = new SafeMemoryDMStore(schema);
+    }
+    return memoryDMStore;
+  }
+
+  /**
+   * This method will ocnvert safe to unsafe memory DM store
+   *
+   * @throws MemoryException
+   */
+  public void convertToUnsafeDMStore() throws MemoryException {
+    if (memoryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeMemoryDMStore = memoryDMStore.convertToUnsafeDMStore();
+      memoryDMStore.freeMemory();
+      memoryDMStore = unsafeMemoryDMStore;
+    }
+    if (summaryDMStore instanceof SafeMemoryDMStore) {
+      UnsafeMemoryDMStore unsafeSummaryMemoryDMStore = summaryDMStore.convertToUnsafeDMStore();
+      summaryDMStore.freeMemory();
+      summaryDMStore = unsafeSummaryMemoryDMStore;
+    }
+  }
+
+  /**
+   * Read column schema from binary
+   * @param schemaArray
+   * @throws IOException
+   */
+  public List<ColumnSchema> readColumnSchema(byte[] schemaArray) throws IOException {
+    // uncompress it.
+    schemaArray = Snappy.uncompress(schemaArray);
+    ByteArrayInputStream schemaStream = new ByteArrayInputStream(schemaArray);
+    DataInput schemaInput = new DataInputStream(schemaStream);
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    int size = schemaInput.readShort();
+    for (int i = 0; i < size; i++) {
+      ColumnSchema columnSchema = new ColumnSchema();
+      columnSchema.readFields(schemaInput);
+      columnSchemas.add(columnSchema);
+    }
+    return columnSchemas;
+  }
+
+  public long getBlockletSchemaTime() {
+    return blockletSchemaTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 99e48a5..7cdf77d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.indexstore.blockletindex;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 
 /**
  * This class contains required information to make the Blocklet datamap distributable.
@@ -31,6 +32,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
   public BlockletDataMapDistributable(String indexFilePath) {
     this.filePath = indexFilePath;
   }
@@ -38,4 +41,13 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
   public String getFilePath() {
     return filePath;
   }
+
+  public TableBlockIndexUniqueIdentifier getTableBlockIndexUniqueIdentifier() {
+    return tableBlockIndexUniqueIdentifier;
+  }
+
+  public void setTableBlockIndexUniqueIdentifier(
+      TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifiers) {
+    this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifiers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c0bc2a6..c3df721 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -38,15 +41,17 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -59,7 +64,7 @@ import org.apache.hadoop.fs.RemoteIterator;
  * Table map for blocklet
  */
 public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
-    implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
+    implements BlockletDetailsFetcher, SegmentPropertiesFetcher, CacheableDataMap {
 
   private static final String NAME = "clustered.btree.blocklet";
 
@@ -69,9 +74,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
 
-  private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
 
   public BlockletDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
     super(carbonTable, dataMapSchema);
@@ -91,24 +96,27 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
-    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
-    return cache.getAll(tableBlockIndexUniqueIdentifiers);
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+        new ArrayList<>(identifiers.size());
+    tableBlockIndexUniqueIdentifiers.addAll(identifiers);
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+        cache.getAll(tableBlockIndexUniqueIdentifiers);
+    for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+      dataMaps.addAll(wrapper.getDataMaps());
+    }
+    return dataMaps;
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+  private Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
       throws IOException {
-    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
-      tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles = segment.getCommittedIndexFile();
-      for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
-        Path indexFile = new Path(indexFileEntry.getKey());
-        tableBlockIndexUniqueIdentifiers.add(
-            new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
-                indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
-      }
+      tableBlockIndexUniqueIdentifiers =
+          BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
       segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
     }
     return tableBlockIndexUniqueIdentifiers;
@@ -130,7 +138,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       }
       return detailedBlocklets;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
+    Set<TableBlockIndexUniqueIdentifier> identifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
@@ -145,17 +153,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
-    List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+    Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
-  private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers,
+  private ExtendedBlocklet getExtendedBlocklet(Set<TableBlockIndexUniqueIdentifier> identifiers,
       Blocklet blocklet) throws IOException {
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getIndexFileName().startsWith(blocklet.getFilePath())) {
-        DataMap dataMap = cache.get(identifier);
-        return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+      BlockletDataMapIndexWrapper wrapper = cache.get(identifier);
+      List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+      for (DataMap dataMap : dataMaps) {
+        if (((BlockletDataMap) dataMap).getIndexFileName().startsWith(blocklet.getFilePath())) {
+          return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
+        }
       }
     }
     throw new IOException("Blocklet with blockid " + blocklet.getBlockletId() + " not found ");
@@ -166,23 +176,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
     try {
-      CarbonFile[] carbonIndexFiles;
-      if (segment.getSegmentFileName() == null) {
-        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        Map<String, String> indexFiles = fileStore.getIndexFiles();
-        carbonIndexFiles = new CarbonFile[indexFiles.size()];
-        int i = 0;
-        for (String indexFile : indexFiles.keySet()) {
-          carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
-        }
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      CarbonFile[] carbonIndexFiles = new CarbonFile[tableBlockIndexUniqueIdentifiers.size()];
+      int identifierCounter = 0;
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+          tableBlockIndexUniqueIdentifiers) {
+        String indexFilePath = tableBlockIndexUniqueIdentifier.getIndexFilePath();
+        String fileName = tableBlockIndexUniqueIdentifier.getIndexFileName();
+        carbonIndexFiles[identifierCounter++] = FileFactory
+            .getCarbonFile(indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + fileName);
       }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
         Path path = new Path(carbonIndexFiles[i].getPath());
-
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
         RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
         LocatedFileStatus fileStatus = iter.next();
@@ -205,13 +211,18 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   @Override
   public void clear(Segment segment) {
-    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
-        DataMap dataMap = cache.getIfPresent(blockIndex);
-        if (dataMap != null) {
-          cache.invalidate(blockIndex);
-          dataMap.clear();
+        BlockletDataMapIndexWrapper wrapper = cache.getIfPresent(blockIndex);
+        if (null != wrapper) {
+          List<BlockletDataMap> dataMaps = wrapper.getDataMaps();
+          for (DataMap dataMap : dataMaps) {
+            if (dataMap != null) {
+              cache.invalidate(blockIndex);
+              dataMap.clear();
+            }
+          }
         }
       }
     }
@@ -246,9 +257,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
                 segmentNo));
       }
     }
-    List<CoarseGrainDataMap> dataMaps;
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
     try {
-      dataMaps = cache.getAll(identifiers);
+      List<BlockletDataMapIndexWrapper> wrappers = cache.getAll(identifiers);
+      for (BlockletDataMapIndexWrapper wrapper : wrappers) {
+        dataMaps.addAll(wrapper.getDataMaps());
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -289,4 +303,29 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return false;
   }
 
+  @Override public void cache(TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier,
+      BlockletDataMapIndexWrapper blockletDataMapIndexWrapper) throws IOException, MemoryException {
+    cache.put(tableBlockIndexUniqueIdentifier, blockletDataMapIndexWrapper);
+  }
+
+  @Override
+  public List<DataMapDistributable> getAllUncachedDistributables(
+      List<DataMapDistributable> distributables) throws IOException {
+    List<DataMapDistributable> distributablesToBeLoaded = new ArrayList<>(distributables.size());
+    for (DataMapDistributable distributable : distributables) {
+      Segment segment = distributable.getSegment();
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+      // filter out the tableBlockIndexUniqueIdentifiers based on distributable
+      TableBlockIndexUniqueIdentifier validIdentifier = BlockletDataMapUtil
+          .filterIdentifiersBasedOnDistributable(tableBlockIndexUniqueIdentifiers,
+              (BlockletDataMapDistributable) distributable);
+      if (null == cache.getIfPresent(validIdentifier)) {
+        ((BlockletDataMapDistributable) distributable)
+            .setTableBlockIndexUniqueIdentifier(validIdentifier);
+        distributablesToBeLoaded.add(distributable);
+      }
+    }
+    return distributablesToBeLoaded;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index ebeb278..7443d15 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -32,6 +32,8 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private String segmentId;
 
+  private boolean addToUnsafe = true;
+
   public BlockletDataMapModel(String filePath, byte[] fileData,
       Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
     super(filePath);
@@ -40,6 +42,12 @@ public class BlockletDataMapModel extends DataMapModel {
     this.segmentId = segmentId;
   }
 
+  public BlockletDataMapModel(String filePath, byte[] fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, boolean addToUnsafe) {
+    this(filePath, fileData, blockMetaInfoMap, segmentId);
+    this.addToUnsafe = addToUnsafe;
+  }
+
   public byte[] getFileData() {
     return fileData;
   }
@@ -51,4 +59,8 @@ public class BlockletDataMapModel extends DataMapModel {
   public String getSegmentId() {
     return segmentId;
   }
+
+  public boolean isAddToUnsafe() {
+    return addToUnsafe;
+  }
 }


Mime
View raw message