carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [38/52] [partial] incubator-carbondata git commit: move core package
Date Mon, 16 Jan 2017 14:53:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
new file mode 100644
index 0000000..ae5c9b4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Compressed dimension chunk reader class for version 2
+ */
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
+
+  /**
+   * dimension chunks offset
+   */
+  private List<Long> dimensionChunksOffset;
+
+  /**
+   * dimension chunks length
+   */
+  private List<Short> dimensionChunksLength;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo
+   * @param eachColumnValueSize
+   * @param filePath
+   */
+  public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+    this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+    this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is:
+   * Except last column all the column chunk can be read in group
+   * if not last column then read data of all the column present in block index
+   * together then process it.
+   * For last column read is separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionColumnDataChunk[] readDimensionChunks(final FileHolder fileReader,
+      final int[][] blockIndexes) throws IOException {
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[dimensionChunksOffset.size()];
+    // if blocklet index is empty then return empry data chunk
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    DimensionColumnDataChunk[] groupChunk = null;
+    int index = 0;
+    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+    // the last column reading in group
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    // check last index is present in block index, if it is present then read separately
+    if (blockIndexes[blockIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readDimensionChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    }
+    // otherwise read the data in group
+    else {
+      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block index
+   *
+   * @param fileReader file reader to read the blocks from file
+   * @param blockIndex block to be read
+   * @return dimension column chunk
+   */
+  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
+      int blockIndex) throws IOException {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    DataChunk2 dimensionColumnChunk = null;
+    byte[] data = null;
+    int copySourcePoint = 0;
+    byte[] dimensionChunk = null;
+    if (dimensionChunksOffset.size() - 1 == blockIndex) {
+      dimensionChunk = fileReader.readByteArray(filePath, dimensionChunksOffset.get(blockIndex),
+          dimensionChunksLength.get(blockIndex));
+      dimensionColumnChunk = CarbonUtil
+          .readDataChunk(dimensionChunk, copySourcePoint, dimensionChunksLength.get(blockIndex));
+      int totalDimensionDataLength =
+          dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
+              + dimensionColumnChunk.rowid_page_length;
+      data = fileReader.readByteArray(filePath,
+          dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex),
+          totalDimensionDataLength);
+    } else {
+      long currentDimensionOffset = dimensionChunksOffset.get(blockIndex);
+      data = fileReader.readByteArray(filePath, currentDimensionOffset,
+          (int) (dimensionChunksOffset.get(blockIndex + 1) - currentDimensionOffset));
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(data, copySourcePoint, dimensionChunksLength.get(blockIndex));
+      copySourcePoint += dimensionChunksLength.get(blockIndex);
+    }
+
+    // first read the data and uncompressed it
+    dataPage =
+        COMPRESSOR.unCompressByte(data, copySourcePoint, dimensionColumnChunk.data_page_length);
+    copySourcePoint += dimensionColumnChunk.data_page_length;
+    // if row id block is present then read the row id chunk and uncompress it
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+      invertedIndexes = CarbonUtil
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, data, numberComressor,
+              copySourcePoint);
+      copySourcePoint += dimensionColumnChunk.rowid_page_length;
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+      rlePage =
+          numberComressor.unCompress(data, copySourcePoint, dimensionColumnChunk.rle_page_length);
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionColumnDataChunk columnDataChunk = null;
+
+    if (dimensionColumnChunk.isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, eachColumnValueSize[blockIndex],
+          numberOfRows);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              numberOfRows);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              numberOfRows, eachColumnValueSize[blockIndex]);
+    }
+    return columnDataChunk;
+  }
+
+  /**
+   * Below method will be used to read the dimension chunks in group.
+   * This is to enhance the IO performance. Will read the data from start index
+   * to end index(including)
+   *
+   * @param fileReader      stream used for reading
+   * @param startBlockIndex start block index
+   * @param endBlockIndex   end block index
+   * @return dimension column chunk array
+   */
+  private DimensionColumnDataChunk[] readDimensionChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) throws IOException {
+    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+    byte[] data = fileReader.readByteArray(filePath, currentDimensionOffset,
+        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+    int copySourcePoint = 0;
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[endBlockIndex - startBlockIndex + 1];
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    DataChunk2 dimensionColumnChunk = null;
+    int index = 0;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      invertedIndexes = null;
+      invertedIndexesReverse = null;
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(data, copySourcePoint, dimensionChunksLength.get(i));
+      copySourcePoint += dimensionChunksLength.get(i);
+      // first read the data and uncompressed it
+      dataPage =
+          COMPRESSOR.unCompressByte(data, copySourcePoint, dimensionColumnChunk.data_page_length);
+      copySourcePoint += dimensionColumnChunk.data_page_length;
+      // if row id block is present then read the row id chunk and uncompress it
+      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+        invertedIndexes = CarbonUtil
+            .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, data, numberComressor,
+                copySourcePoint);
+        copySourcePoint += dimensionColumnChunk.rowid_page_length;
+        // get the reverse index
+        invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+      }
+      // if rle is applied then read the rle block chunk and then uncompress
+      //then actual data based on rle block
+      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+        // read and uncompress the rle block
+        rlePage =
+            numberComressor.unCompress(data, copySourcePoint, dimensionColumnChunk.rle_page_length);
+        copySourcePoint += dimensionColumnChunk.rle_page_length;
+        // uncompress the data with rle indexes
+        dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[i]);
+        rlePage = null;
+      }
+      // fill chunk attributes
+      DimensionColumnDataChunk columnDataChunk = null;
+      if (dimensionColumnChunk.isRowMajor()) {
+        // to store fixed length column chunk values
+        columnDataChunk =
+            new ColumnGroupDimensionDataChunk(dataPage, eachColumnValueSize[i], numberOfRows);
+      }
+      // if no dictionary column then first create a no dictionary column chunk
+      // and set to data chunk instance
+      else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+        columnDataChunk =
+            new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+                numberOfRows);
+      } else {
+        // to store fixed length column chunk values
+        columnDataChunk =
+            new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+                numberOfRows, eachColumnValueSize[i]);
+      }
+      dataChunks[index++] = columnDataChunk;
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to check whether particular encoding is present
+   * in the dimension or not
+   *
+   * @param encoding encoding to search
+   * @return if encoding is present in dimension
+   */
+  private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+    return encodings.contains(encoding);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
new file mode 100644
index 0000000..0c97c26
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure;
+
+import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
+
+/**
+ * Measure block reader abstract class
+ */
+public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
+
+  /**
+   * file path from which blocks will be read
+   */
+  protected String filePath;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param filePath           file from which data will be read
+   */
+  public AbstractMeasureChunkReader(String filePath) {
+    this.filePath = filePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
new file mode 100644
index 0000000..1f3fec0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v1;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
+import org.apache.carbondata.core.datastore.compression.ReaderCompressModel;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+/**
+ * Compressed measure chunk reader
+ */
+public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChunkReader {
+
+  /**
+   * measure chunk have the information about the metadata present in the file
+   */
+  private final List<DataChunk> measureColumnChunks;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo BlockletInfo
+   * @param filePath     file from which data will be read
+   */
+  public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath);
+    this.measureColumnChunks = blockletInfo.getMeasureColumnChunk();
+  }
+
+  /**
+   * Method to read the blocks data based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks
+   * @param blockIndexes blocks to be read
+   * @return measure data chunks
+   */
+  @Override public MeasureColumnDataChunk[] readMeasureChunks(final FileHolder fileReader,
+      final int[][] blockIndexes) throws IOException {
+    MeasureColumnDataChunk[] datChunk = new MeasureColumnDataChunk[measureColumnChunks.size()];
+    for (int i = 0; i < blockIndexes.length; i++) {
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        datChunk[j] = readMeasureChunk(fileReader, j);
+      }
+    }
+    return datChunk;
+  }
+
+  /**
+   * Method to read the blocks data based on block index
+   *
+   * @param fileReader file reader to read the blocks
+   * @param blockIndex block to be read
+   * @return measure data chunk
+   */
+  @Override public MeasureColumnDataChunk readMeasureChunk(final FileHolder fileReader,
+      final int blockIndex) throws IOException {
+    ValueEncoderMeta meta = measureColumnChunks.get(blockIndex).getValueEncoderMeta().get(0);
+    ReaderCompressModel compressModel = ValueCompressionUtil.getReaderCompressModel(meta);
+
+    ValueCompressionHolder values = compressModel.getValueCompressionHolder();
+    byte[] dataPage = fileReader
+            .readByteArray(filePath, measureColumnChunks.get(blockIndex).getDataPageOffset(),
+                    measureColumnChunks.get(blockIndex).getDataPageLength());
+
+    // unCompress data
+    values.uncompress(compressModel.getConvertedDataType(), dataPage, 0,
+            measureColumnChunks.get(blockIndex).getDataPageLength(), compressModel.getMantissa(),
+            compressModel.getMaxValue());
+
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+
+    // create and set the data chunk
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    datChunk.setMeasureDataHolder(measureDataHolder);
+    // set the enun value indexes
+    datChunk
+        .setNullValueIndexHolder(measureColumnChunks.get(blockIndex).getNullValueIndexForColumn());
+    return datChunk;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
new file mode 100644
index 0000000..2731163
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+
+/**
+ * Class to read the measure column data for version 2
+ */
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
+
+  /**
+   * measure column chunks offset
+   */
+  private List<Long> measureColumnChunkOffsets;
+
+  /**
+   * measure column chunks length
+   */
+  private List<Short> measureColumnChunkLength;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo BlockletInfo
+   * @param filePath     file from which data will be read
+   */
+  public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath);
+    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+  }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  private static PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is: Except last column all the column chunk
+   * can be read in group if not last column then read data of all the column
+   * present in block index together then process it. For last column read is
+   * separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return measure column chunks
+   * @throws IOException
+   */
+  public MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    // read the column chunk based on block index and add
+    MeasureColumnDataChunk[] dataChunks =
+        new MeasureColumnDataChunk[measureColumnChunkOffsets.size()];
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    MeasureColumnDataChunk[] groupChunk = null;
+    int index = 0;
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    } else {
+      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Method to read the blocks data based on block index
+   *
+   * @param fileReader file reader to read the blocks
+   * @param blockIndex block to be read
+   * @return measure data chunk
+   * @throws IOException
+   */
+  @Override public MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    DataChunk2 measureColumnChunk = null;
+    byte[] measureDataChunk = null;
+    byte[] data = null;
+    int copyPoint = 0;
+    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+      measureDataChunk = fileReader
+          .readByteArray(filePath, measureColumnChunkOffsets.get(blockIndex),
+              measureColumnChunkLength.get(blockIndex));
+      measureColumnChunk = CarbonUtil
+          .readDataChunk(measureDataChunk, copyPoint, measureColumnChunkLength.get(blockIndex));
+      data = fileReader.readByteArray(filePath,
+          measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex),
+          measureColumnChunk.data_page_length);
+    } else {
+      long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
+      data = fileReader.readByteArray(filePath, currentMeasureOffset,
+          (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset));
+      measureColumnChunk =
+          CarbonUtil.readDataChunk(data, copyPoint, measureColumnChunkLength.get(blockIndex));
+      copyPoint += measureColumnChunkLength.get(blockIndex);
+    }
+    List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+    for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+      valueEncodeMeta.add(
+          CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(i).array()));
+    }
+    WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+
+    ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+
+    // uncompress
+    values.uncompress(compressionModel.getConvertedDataType()[0], data,
+        copyPoint, measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+            compressionModel.getMaxValue()[0]);
+
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+
+    // set the data chunk
+    datChunk.setMeasureDataHolder(measureDataHolder);
+
+    // set the enun value indexes
+    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+    return datChunk;
+  }
+
+  /**
+   * Below method will be used to read the dimension chunks in group. This is
+   * to enhance the IO performance. Will read the data from start index to end
+   * index(including)
+   *
+   * @param fileReader      stream used for reading
+   * @param startBlockIndex start block index
+   * @param endBlockIndex   end block index
+   * @return measure column chunk array
+   * @throws IOException
+   */
+  private MeasureColumnDataChunk[] readMeasureChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) throws IOException {
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+    byte[] data = fileReader.readByteArray(filePath, currentMeasureOffset,
+        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+    MeasureColumnDataChunk[] dataChunks =
+        new MeasureColumnDataChunk[endBlockIndex - startBlockIndex + 1];
+    MeasureColumnDataChunk dataChunk = null;
+    int index = 0;
+    int copyPoint = 0;
+    DataChunk2 measureColumnChunk = null;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      dataChunk = new MeasureColumnDataChunk();
+      measureColumnChunk =
+          CarbonUtil.readDataChunk(data, copyPoint, measureColumnChunkLength.get(i));
+      copyPoint += measureColumnChunkLength.get(i);
+      List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+      for (int j = 0; j < measureColumnChunk.getEncoder_meta().size(); j++) {
+        valueEncodeMeta.add(
+            CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(j).array()));
+      }
+      WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+
+      ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+
+      // uncompress
+      values.uncompress(compressionModel.getConvertedDataType()[0], data, copyPoint,
+              measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+              compressionModel.getMaxValue()[0]);
+
+      CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+
+      copyPoint += measureColumnChunk.data_page_length;
+      // set the data chunk
+      dataChunk.setMeasureDataHolder(measureDataHolder);
+
+      // set the enun value indexes
+      dataChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+      dataChunks[index++] = dataChunk;
+    }
+    return dataChunks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
new file mode 100644
index 0000000..e1ae9b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeFixedLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeVariableLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeFixedLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeVariableLengthDimesionDataChunkStore;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Below class will be used to get the dimension store type
+ */
+public class DimensionChunkStoreFactory {
+
+  /**
+   * store factory instance
+   */
+  public static final DimensionChunkStoreFactory INSTANCE = new DimensionChunkStoreFactory();
+
+  /**
+   * is unsafe
+   */
+  private static final boolean isUnsafe;
+
+  static {
+    isUnsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION,
+            CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE));
+  }
+
+  private DimensionChunkStoreFactory() {
+
+  }
+
+  /**
+   * Below method will be used to get the dimension store type
+   *
+   * @param columnValueSize column value size
+   * @param isInvertedIndex is inverted index
+   * @param numberOfRows    number of rows
+   * @param totalSize       total size of data
+   * @param storeType       store type
+   * @return dimension store type
+   */
+  public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
+      boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType) {
+
+    if (isUnsafe) {
+      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+        return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
+            isInvertedIndex, numberOfRows);
+      } else {
+        return new UnsafeVariableLengthDimesionDataChunkStore(totalSize, isInvertedIndex,
+            numberOfRows);
+      }
+
+    } else {
+      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+        return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+      } else {
+        return new SafeVariableLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
+      }
+    }
+  }
+
+  /**
+   * dimension store type enum
+   */
+  public enum DimensionStoreType {
+    FIXEDLENGTH, VARIABLELENGTH;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
new file mode 100644
index 0000000..9f28df3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store;
+
+/**
+ * Interface responsibility is to store dimension data in memory.
+ * storage can be on heap or offheap.
+ */
+public interface DimensionDataChunkStore {
+
+  /**
+   * Below method will be used to put the rows and its metadata in offheap
+   *
+   * @param invertedIndex        inverted index to be stored
+   * @param invertedIndexReverse inverted index reverse to be stored
+   * @param data                 data to be stored
+   */
+  void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
+
+  /**
+   * Below method will be used to get the row
+   * based on row id passed
+   *
+   * @param index
+   * @return row
+   */
+  byte[] getRow(int rowId);
+
+  /**
+   * Below method will be used to fill the row values to buffer array
+   *
+   * @param rowId  row id of the data to be filled
+   * @param data   buffer in which data will be filled
+   * @param offset off the of the buffer
+   */
+  void fillRow(int rowId, byte[] buffer, int offset);
+
+  /**
+   * Below method will be used to get the inverted index
+   *
+   * @param rowId row id
+   * @return inverted index based on row id passed
+   */
+  int getInvertedIndex(int rowId);
+
+  /**
+   * Below method will be used to get the surrogate key of the
+   * based on the row id passed
+   *
+   * @param rowId row id
+   * @return surrogate key
+   */
+  int getSurrogate(int rowId);
+
+  /**
+   * @return size of each column value
+   */
+  int getColumnValueSize();
+
+  /**
+   * @return whether column was explicitly sorted or not
+   */
+  boolean isExplicitSorted();
+
+  /**
+   * Below method will be used to free the memory occupied by
+   * the column chunk
+   */
+  void freeMemory();
+
+  /**
+   * to compare the two byte array
+   *
+   * @param index        index of first byte array
+   * @param compareValue value of to be compared
+   * @return compare result
+   */
+  int compareTo(int index, byte[] compareValue);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
new file mode 100644
index 0000000..cb2352f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeByteMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeDoubleMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeIntMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeLongMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeShortMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeByteMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeDoubleMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeIntMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeLongMeasureChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeShortMeasureChunkStore;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+/**
+ * Factory class for getting the measure store type
+ */
+public class MeasureChunkStoreFactory {
+
+  /**
+   * instance type
+   */
+  public static final MeasureChunkStoreFactory INSTANCE = new MeasureChunkStoreFactory();
+
+  /**
+   * is unsafe
+   */
+  private static final boolean isUnsafe;
+
+  static {
+    isUnsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION,
+            CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE));
+  }
+
+  private MeasureChunkStoreFactory() {
+  }
+
+  /**
+   * Below method will be used to get the measure data chunk store based on data type
+   *
+   * @param dataType     data type
+   * @param numberOfRows number of rows
+   * @return measure chunk store
+   */
+  public MeasureDataChunkStore getMeasureDataChunkStore(DataType dataType, int numberOfRows) {
+    if (!isUnsafe) {
+      switch (dataType) {
+        case DATA_BYTE:
+          return new SafeByteMeasureChunkStore(numberOfRows);
+        case DATA_SHORT:
+          return new SafeShortMeasureChunkStore(numberOfRows);
+        case DATA_INT:
+          return new SafeIntMeasureChunkStore(numberOfRows);
+        case DATA_LONG:
+          return new SafeLongMeasureChunkStore(numberOfRows);
+        case DATA_DOUBLE:
+          return new SafeDoubleMeasureChunkStore(numberOfRows);
+        default:
+          return new SafeDoubleMeasureChunkStore(numberOfRows);
+      }
+    } else {
+      switch (dataType) {
+        case DATA_BYTE:
+          return new UnsafeByteMeasureChunkStore(numberOfRows);
+        case DATA_SHORT:
+          return new UnsafeShortMeasureChunkStore(numberOfRows);
+        case DATA_INT:
+          return new UnsafeIntMeasureChunkStore(numberOfRows);
+        case DATA_LONG:
+          return new UnsafeLongMeasureChunkStore(numberOfRows);
+        case DATA_DOUBLE:
+          return new UnsafeDoubleMeasureChunkStore(numberOfRows);
+        default:
+          return new UnsafeDoubleMeasureChunkStore(numberOfRows);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureDataChunkStore.java
new file mode 100644
index 0000000..f85679e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureDataChunkStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store;
+
+import java.math.BigDecimal;
+
+/**
+ * Responsibility is store the measure data in memory,
+ * memory can be on heap or offheap based on the user configuration
+ */
+public interface MeasureDataChunkStore<T> {
+
+  /**
+   * Below method will be used to put the data to memory
+   *
+   * @param data
+   */
+  void putData(T data);
+
+  /**
+   * to get byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  byte getByte(int index);
+
+  /**
+   * to get the short value
+   *
+   * @param index
+   * @return short value based on index
+   */
+  short getShort(int index);
+
+  /**
+   * to get the int value
+   *
+   * @param index
+   * @return int value based on index
+   */
+  int getInt(int index);
+
+  /**
+   * to get the long value
+   *
+   * @param index
+   * @return long value based on index
+   */
+  long getLong(int index);
+
+  /**
+   * to get the double value
+   *
+   * @param index
+   * @return double value based on index
+   */
+  double getDouble(int index);
+
+  /**
+   * To get the bigdecimal value
+   *
+   * @param index
+   * @return bigdecimal value based on index
+   */
+  BigDecimal getBigDecimal(int index);
+
+  /**
+   * To free the occupied memory
+   */
+  void freeMemory();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
new file mode 100644
index 0000000..ad561e9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
+
+/**
+ * Responsibility is to store dimension data
+ */
+public abstract class SafeAbsractDimensionDataChunkStore implements DimensionDataChunkStore {
+
+  /**
+   * data chunk for dimension column
+   */
+  protected byte[] data;
+
+  /**
+   * inverted index
+   */
+  protected int[] invertedIndex;
+
+  /**
+   * inverted index reverser
+   */
+  protected int[] invertedIndexReverse;
+
+  /**
+   * to check whether dimension column was explicitly sorted or not
+   */
+  protected boolean isExplictSorted;
+
+  /**
+   * Constructor
+   *
+   * @param totalSize      total size of the data to be kept
+   * @param isInvertedIdex is inverted index present
+   * @param numberOfRows   total number of rows
+   */
+  public SafeAbsractDimensionDataChunkStore(boolean isInvertedIdex) {
+    this.isExplictSorted = isInvertedIdex;
+  }
+
+  /**
+   * Below method will be used to put the rows and its metadata in offheap
+   *
+   * @param invertedIndex        inverted index to be stored
+   * @param invertedIndexReverse inverted index reverse to be stored
+   * @param data                 data to be stored
+   */
+  @Override public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
+      final byte[] data) {
+    this.data = data;
+    this.invertedIndex = invertedIndex;
+    this.invertedIndexReverse = invertedIndexReverse;
+  }
+
+  /**
+   * Below method will be used to free the memory occupied by the column chunk
+   */
+  @Override public void freeMemory() {
+    // do nothing as GC will take care of freeing memory
+  }
+
+  /**
+   * Below method will be used to get the inverted index
+   *
+   * @param rowId row id
+   * @return inverted index based on row id passed
+   */
+  @Override public int getInvertedIndex(int rowId) {
+    return invertedIndex[rowId];
+  }
+
+  /**
+   * Below method will be used to get the surrogate key of the based on the row
+   * id passed
+   *
+   * @param rowId row id
+   * @return surrogate key
+   */
+  @Override public int getSurrogate(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * @return size of each column value
+   */
+  @Override public int getColumnValueSize() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * @return whether column was explicitly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return isExplictSorted;
+  }
+
+  /**
+   * Below method will be used to fill the row values to data array
+   *
+   * @param rowId  row id of the data to be filled
+   * @param data   buffer in which data will be filled
+   * @param offset off the of the buffer
+   */
+  @Override public void fillRow(int rowId, byte[] data, int offset) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractMeasureDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractMeasureDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractMeasureDataChunkStore.java
new file mode 100644
index 0000000..434dc2d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbstractMeasureDataChunkStore.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+
+/**
+ * Responsibility is store the measure data in memory,
+ */
+public abstract class SafeAbstractMeasureDataChunkStore<T> implements
+    MeasureDataChunkStore<T> {
+
+  /**
+   * number of rows
+   */
+  protected int numberOfRows;
+
+  public SafeAbstractMeasureDataChunkStore(int numberOfRows) {
+    this.numberOfRows = numberOfRows;
+  }
+
+  /**
+   * to get the byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  @Override
+  public byte getByte(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the short value
+   *
+   * @param index
+   * @return short value based on index
+   */
+  @Override
+  public short getShort(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the int value
+   *
+   * @param index
+   * @return int value based on index
+   */
+  @Override
+  public int getInt(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the long value
+   *
+   * @param index
+   * @return long value based on index
+   */
+  @Override
+  public long getLong(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the double value
+   *
+   * @param index
+   * @return double value based on index
+   */
+  @Override
+  public double getDouble(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * To get the bigdecimal value
+   *
+   * @param index
+   * @return bigdecimal value based on index
+   */
+  @Override
+  public BigDecimal getBigDecimal(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * To free the occupied memory
+   */
+  @Override
+  public void freeMemory() {
+    // do nothing as GC will take care of freeing the memory
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeByteMeasureChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeByteMeasureChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeByteMeasureChunkStore.java
new file mode 100644
index 0000000..a2c9b06
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeByteMeasureChunkStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+/**
+ * Responsible for storing Byte array data to memory.
+ */
+public class SafeByteMeasureChunkStore extends
+    SafeAbstractMeasureDataChunkStore<byte[]> {
+
+  /**
+   * data
+   */
+  private byte[] data;
+
+  public SafeByteMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  /**
+   * Below method will be used to put byte array data to memory
+   *
+   * @param data
+   */
+  @Override
+  public void putData(byte[] data) {
+    this.data = data;
+  }
+
+  /**
+   * to get the byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  @Override
+  public byte getByte(int index) {
+    return this.data[index];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeDoubleMeasureChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeDoubleMeasureChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeDoubleMeasureChunkStore.java
new file mode 100644
index 0000000..051b866
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeDoubleMeasureChunkStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+/**
+ * Below class will be used to store the measure values of double data type
+ */
+public class SafeDoubleMeasureChunkStore extends
+    SafeAbstractMeasureDataChunkStore<double[]> {
+
+  /**
+   * data
+   */
+  private double[] data;
+
+  public SafeDoubleMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  /**
+   * Below method will be used to store double array data
+   *
+   * @param data
+   */
+  @Override
+  public void putData(double[] data) {
+    this.data = data;
+  }
+
+  /**
+   * to get the double value
+   *
+   * @param index
+   * @return double value based on index
+   */
+  @Override
+  public double getDouble(int index) {
+    return this.data[index];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..783717f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Below class will be used to store fixed length dimension data
+ */
+public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimensionDataChunkStore {
+
+  /**
+   * Size of each value
+   */
+  private int columnValueSize;
+
+  public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) {
+    super(isInvertedIndex);
+    this.columnValueSize = columnValueSize;
+  }
+
+  /**
+   * Below method will be used to get the row based inverted index
+   *
+   * @param rowId Inverted index
+   */
+  @Override public byte[] getRow(int rowId) {
+    // if column was explicitly sorted we need to get the rowid based inverted index reverse
+    if (isExplictSorted) {
+      rowId = invertedIndexReverse[rowId];
+    }
+    // creating a row
+    byte[] row = new byte[columnValueSize];
+    //copy the row from data chunk based on offset
+    // offset position will be index * each column value length
+    System.arraycopy(this.data, rowId * columnValueSize, row, 0, columnValueSize);
+    return row;
+  }
+
+  /**
+   * Below method will be used to get the surrogate key of the based on the row
+   * id passed
+   *
+   * @param rowId row id
+   * @return surrogate key
+   */
+  @Override public int getSurrogate(int index) {
+    // if column was explicitly sorted we need to get the rowid based inverted index reverse
+    if (isExplictSorted) {
+      index = invertedIndexReverse[index];
+    }
+    // below part is to convert the byte array to surrogate value
+    int startOffsetOfData = index * columnValueSize;
+    int surrogate = 0;
+    for (int i = 0; i < columnValueSize; i++) {
+      surrogate <<= 8;
+      surrogate ^= data[startOffsetOfData] & 0xFF;
+      startOffsetOfData++;
+    }
+    return surrogate;
+  }
+
+  /**
+   * Below method will be used to fill the row values to buffer array
+   *
+   * @param rowId  row id of the data to be filled
+   * @param data   buffer in which data will be filled
+   * @param offset off the of the buffer
+   */
+  @Override public void fillRow(int rowId, byte[] buffer, int offset) {
+    // if column was explicitly sorted we need to get the rowid based inverted index reverse
+    if (isExplictSorted) {
+      rowId = invertedIndexReverse[rowId];
+    }
+    //copy the row from memory block based on offset
+    // offset position will be index * each column value length
+    System.arraycopy(data, rowId * columnValueSize, buffer, offset, columnValueSize);
+  }
+
+  /**
+   * @return size of each column value
+   */
+  @Override public int getColumnValueSize() {
+    return columnValueSize;
+  }
+
+  /**
+   * to compare the two byte array
+   *
+   * @param index        index of first byte array
+   * @param compareValue value of to be compared
+   * @return compare result
+   */
+  @Override public int compareTo(int index, byte[] compareValue) {
+    return ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(data, index * columnValueSize, columnValueSize, compareValue, 0,
+            columnValueSize);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeIntMeasureChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeIntMeasureChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeIntMeasureChunkStore.java
new file mode 100644
index 0000000..656e8f8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeIntMeasureChunkStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+/**
+ * Responsible for storing int array data to memory.
+ */
+public class SafeIntMeasureChunkStore extends
+    SafeAbstractMeasureDataChunkStore<int[]> {
+
+  /**
+   * data
+   */
+  private int[] data;
+
+  public SafeIntMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  /**
+   * Below method will be used to put int array data to memory
+   *
+   * @param data
+   */
+  @Override
+  public void putData(int[] data) {
+    this.data = data;
+  }
+
+  /**
+   * to get the int value
+   *
+   * @param index
+   * @return int value based on index
+   */
+  @Override
+  public int getInt(int index) {
+    return this.data[index];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeLongMeasureChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeLongMeasureChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeLongMeasureChunkStore.java
new file mode 100644
index 0000000..656dced
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeLongMeasureChunkStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+/**
+ * Below class will be used to store the measure values of long data type
+ *
+ */
+public class SafeLongMeasureChunkStore extends
+    SafeAbstractMeasureDataChunkStore<long[]> {
+
+  /**
+   * data
+   */
+  private long[] data;
+
+  public SafeLongMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  /**
+   * Below method will be used to store long array data
+   *
+   * @param data
+   */
+  @Override
+  public void putData(long[] data) {
+    this.data = data;
+  }
+
+  /**
+   * to get the long value
+   *
+   * @param index
+   * @return long value based on index
+   */
+  @Override
+  public long getLong(int index) {
+    return this.data[index];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeShortMeasureChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeShortMeasureChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeShortMeasureChunkStore.java
new file mode 100644
index 0000000..3220bd0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeShortMeasureChunkStore.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+/**
+ * Below class will be used to store the measure values of short data type
+ *
+ */
+public class SafeShortMeasureChunkStore extends
+    SafeAbstractMeasureDataChunkStore<short[]> {
+
+  /**
+   * data
+   */
+  private short[] data;
+
+  public SafeShortMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  /**
+   * Below method will be used to put short array data
+   *
+   * @param data
+   */
+  @Override
+  public void putData(short[] data) {
+    this.data = data;
+  }
+
+  /**
+   * to get the short value
+   *
+   * @param index
+   * @return shot value based on index
+   */
+  @Override
+  public short getShort(int index) {
+    return data[index];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..8964948
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Below class is responsible to store variable length dimension data chunk in
+ * memory Memory occupied can be on heap or offheap using unsafe interface
+ */
+public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimensionDataChunkStore {
+
+  /**
+   * total number of rows
+   */
+  private int numberOfRows;
+
+  /**
+   * offset of the data this will be used during search, as we can directly jump
+   * to particular location
+   */
+  private int[] dataOffsets;
+
+  public SafeVariableLengthDimensionDataChunkStore(boolean isInvertedIndex, int numberOfRows) {
+    super(isInvertedIndex);
+    this.numberOfRows = numberOfRows;
+    this.dataOffsets = new int[numberOfRows];
+  }
+
+  /**
+   * Below method will be used to put the rows and its metadata in offheap
+   *
+   * @param invertedIndex        inverted index to be stored
+   * @param invertedIndexReverse inverted index reverse to be stored
+   * @param data                 data to be stored
+   */
+  @Override public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
+      byte[] data) {
+    // first put the data, inverted index and reverse inverted index to memory
+    super.putArray(invertedIndex, invertedIndexReverse, data);
+    // As data is of variable length and data format is
+    // <length in short><data><length in short><data>
+    // we need to store offset of each data so data can be accessed directly
+    // for example:
+    //data = {0,5,1,2,3,4,5,0,6,0,1,2,3,4,5,0,2,8,9}
+    //so value stored in offset will be position of actual data
+    // [2,9,17]
+    // to store this value we need to get the actual data length + 2 bytes used for storing the
+    // length
+
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    // as first position will be start from 2 byte as data is stored first in the memory block
+    // we need to skip first two bytes this is because first two bytes will be length of the data
+    // which we have to skip
+    dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    // creating a byte buffer which will wrap the length of the row
+    ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    for (int i = 1; i < numberOfRows; i++) {
+      buffer.put(data, startOffset, CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+      buffer.flip();
+      // so current row position will be
+      // previous row length + 2 bytes used for storing previous row data
+      startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      // as same byte buffer is used to avoid creating many byte buffer for each row
+      // we need to clear the byte buffer
+      buffer.clear();
+      dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    }
+  }
+
+  @Override public byte[] getRow(int rowId) {
+    // if column was explicitly sorted we need to get the rowid based inverted index reverse
+    if (isExplictSorted) {
+      rowId = invertedIndexReverse[rowId];
+    }
+    // now to get the row from memory block we need to do following thing
+    // 1. first get the current offset
+    // 2. if it's not a last row- get the next row offset
+    // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+    // else subtract the current row offset with complete data
+    // length get the offset of set of data
+    int currentDataOffset = dataOffsets[rowId];
+    short length = 0;
+    // calculating the length of data
+    if (rowId < numberOfRows - 1) {
+      length = (short) (dataOffsets[rowId + 1] - (currentDataOffset
+          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+    } else {
+      // for last record
+      length = (short) (this.data.length - currentDataOffset);
+    }
+    byte[] currentRowData = new byte[length];
+    System.arraycopy(data, currentDataOffset, currentRowData, 0, length);
+    return currentRowData;
+  }
+
+  @Override public int compareTo(int index, byte[] compareValue) {
+    // now to get the row from memory block we need to do following thing
+    // 1. first get the current offset
+    // 2. if it's not a last row- get the next row offset
+    // Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+    // else subtract the current row offset with complete data
+    // length
+
+    // get the offset of set of data
+    int currentDataOffset = dataOffsets[index];
+    short length = 0;
+    // calculating the length of data
+    if (index < numberOfRows - 1) {
+      length = (short) (dataOffsets[index + 1] - (currentDataOffset
+          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+    } else {
+      // for last record
+      length = (short) (this.data.length - currentDataOffset);
+    }
+    return ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(data, currentDataOffset, length, compareValue, 0, compareValue.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
new file mode 100644
index 0000000..b4e6a9f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+/**
+ * Responsibility is to store dimension data in memory. storage can be on heap
+ * or offheap.
+ */
+public abstract class UnsafeAbstractDimensionDataChunkStore implements DimensionDataChunkStore {
+
+  /**
+   * memory block for data page
+   */
+  protected MemoryBlock dataPageMemoryBlock;
+
+  /**
+   * to check whether dimension column was explicitly sorted or not
+   */
+  protected boolean isExplicitSorted;
+
+  /**
+   * is memory released
+   */
+  protected boolean isMemoryReleased;
+
+  /**
+   * length of the actual data
+   */
+  protected int dataLength;
+
+  /**
+   * offset of the inverted index reverse
+   */
+  protected long invertedIndexReverseOffset;
+
+  /**
+   * to validate whether data is already kept in memory or not
+   */
+  protected boolean isMemoryOccupied;
+
+  /**
+   * Constructor
+   *
+   * @param totalSize      total size of the data to be kept
+   * @param isInvertedIdex is inverted index present
+   * @param numberOfRows   total number of rows
+   */
+  public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
+      int numberOfRows) {
+    // allocating the data page
+    this.dataPageMemoryBlock =
+        MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(totalSize);
+    this.isExplicitSorted = isInvertedIdex;
+  }
+
+  /**
+   * Below method will be used to put the rows and its metadata in offheap
+   *
+   * @param invertedIndex        inverted index to be stored
+   * @param invertedIndexReverse inverted index reverse to be stored
+   * @param data                 data to be stored
+   */
+  @Override public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
+      final byte[] data) {
+    assert (!isMemoryOccupied);
+    this.dataLength = data.length;
+    this.invertedIndexReverseOffset = dataLength;
+    if (isExplicitSorted) {
+      this.invertedIndexReverseOffset +=
+          invertedIndex.length * CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+    // copy the data to memory
+    CarbonUnsafe.unsafe
+        .copyMemory(data, CarbonUnsafe.BYTE_ARRAY_OFFSET, dataPageMemoryBlock.getBaseObject(),
+            dataPageMemoryBlock.getBaseOffset(), this.dataLength);
+    // if inverted index is present then copy the inverted index
+    // and reverse inverted index to memory
+    if (isExplicitSorted) {
+      CarbonUnsafe.unsafe.copyMemory(invertedIndex, CarbonUnsafe.INT_ARRAY_OFFSET,
+          dataPageMemoryBlock.getBaseObject(), dataPageMemoryBlock.getBaseOffset() + dataLength,
+          invertedIndex.length * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      CarbonUnsafe.unsafe.copyMemory(invertedIndexReverse, CarbonUnsafe.INT_ARRAY_OFFSET,
+          dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + this.invertedIndexReverseOffset,
+          invertedIndexReverse.length * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    }
+  }
+
+  /**
+   * Below method will be used to free the memory occupied by the column chunk
+   */
+  @Override public void freeMemory() {
+    if (isMemoryReleased) {
+      return;
+    }
+    // free data page memory
+    MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().free(dataPageMemoryBlock);
+    isMemoryReleased = true;
+    this.dataPageMemoryBlock = null;
+    this.isMemoryOccupied = false;
+  }
+
+  /**
+   * Below method will be used to get the inverted index
+   *
+   * @param rowId row id
+   * @return inverted index based on row id passed
+   */
+  @Override public int getInvertedIndex(int rowId) {
+    return CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + dataLength + (rowId
+            * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+  }
+
+  /**
+   * Below method will be used to get the surrogate key of the based on the row
+   * id passed
+   *
+   * @param rowId row id
+   * @return surrogate key
+   */
+  @Override public int getSurrogate(int rowId) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * @return size of each column value
+   */
+  @Override public int getColumnValueSize() {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * @return whether column was explicitly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return isExplicitSorted;
+  }
+
+  /**
+   * Below method will be used to fill the row values to data array
+   *
+   * @param rowId  row id of the data to be filled
+   * @param data   buffer in which data will be filled
+   * @param offset off the of the buffer
+   */
+  @Override public void fillRow(int rowId, byte[] data, int offset) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractMeasureDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractMeasureDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractMeasureDataChunkStore.java
new file mode 100644
index 0000000..d93a6e7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractMeasureDataChunkStore.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+/**
+ * Responsibility is store the measure data in memory, memory can be on heap or
+ * offheap based on the user configuration using unsafe interface
+ */
+public abstract class UnsafeAbstractMeasureDataChunkStore<T> implements MeasureDataChunkStore<T> {
+
+  /**
+   * memory block
+   */
+  protected MemoryBlock dataPageMemoryBlock;
+
+  /**
+   * number of rows
+   */
+  protected int numberOfRows;
+
+  /**
+   * to check memory is released or not
+   */
+  protected boolean isMemoryReleased;
+
+  /**
+   * to check memory is occupied or not
+   */
+  protected boolean isMemoryOccupied;
+
+  public UnsafeAbstractMeasureDataChunkStore(int numberOfRows) {
+    this.numberOfRows = numberOfRows;
+  }
+
+  /**
+   * to get the byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  @Override public byte getByte(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the short value
+   *
+   * @param index
+   * @return short value based on index
+   */
+  @Override public short getShort(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the int value
+   *
+   * @param index
+   * @return int value based on index
+   */
+  @Override public int getInt(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the long value
+   *
+   * @param index
+   * @return long value based on index
+   */
+  @Override public long getLong(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * to get the double value
+   *
+   * @param index
+   * @return double value based on index
+   */
+  @Override public double getDouble(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * To get the bigdecimal value
+   *
+   * @param index
+   * @return bigdecimal value based on index
+   */
+  @Override public BigDecimal getBigDecimal(int index) {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  /**
+   * To free the occupied memory
+   */
+  @Override public void freeMemory() {
+    if (isMemoryReleased) {
+      return;
+    }
+    MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().free(dataPageMemoryBlock);
+    isMemoryReleased = true;
+    this.dataPageMemoryBlock = null;
+    this.isMemoryOccupied = false;
+  }
+
+}



Mime
View raw message