carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-2607][Complex Column Enhancements] Complex Primitive DataType Adaptive Encoding
Date Tue, 10 Jul 2018 14:28:43 GMT
[CARBONDATA-2607][Complex Column Enhancements] Complex Primitive DataType Adaptive Encoding

In this PR the improvement was done to save the complex type more effectively so that reading becomes more efficient.

The changes are:

Primitive types inside complex types are separate pages. Previously it was a single byte array column page for a complex column. Now all sub-levels inside the complex data types are stored as separate pages with their respective datatypes.

No Dictionary Primitive DataTypes inside Complex Columns will be processed through Adaptive Encoding. Previously only snappy compression was applied.

All Primitive datatypes inside complex if it is now dictionary, only value will be saved except String, Varchar which is saved as ByteArray. Previously all sub-levels are saved as Length And Value Format inside a single Byte Array. Currently only Struct And Array type column pages are saved in ByteArray. All other primitive except String and varchar are saved in respective fixed datatype length.

Support for the Safe and Unsafe Fixed length Column Page to support growing dynamic array implementation. This is done to support Array datatype.

Co-authored-by: sounakr <sounakr@gmail.com>

This closes #2417


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/438b4421
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/438b4421
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/438b4421

Branch: refs/heads/master
Commit: 438b4421eebe360c216c7a9251a46fcee8687074
Parents: 75126c6
Author: ajantha-bhat <ajanthabhat@gmail.com>
Authored: Tue Jul 3 19:05:09 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Jul 10 19:58:08 2018 +0530

----------------------------------------------------------------------
 ...mpressedDimensionChunkFileBasedReaderV3.java |  24 +-
 .../AbstractMeasureChunkReaderV2V3Format.java   |  21 -
 ...CompressedMeasureChunkFileBasedReaderV2.java |   3 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |   3 +-
 ...essedMsrChunkFileBasedPageLevelReaderV3.java |   3 +-
 .../chunk/store/ColumnPageWrapper.java          |  90 ++-
 .../core/datastore/page/ColumnPage.java         |   8 +-
 .../core/datastore/page/ComplexColumnPage.java  | 129 +++--
 .../page/FallbackColumnPageEncoder.java         |   4 +-
 .../datastore/page/SafeFixLengthColumnPage.java |  79 ++-
 .../page/UnsafeFixLengthColumnPage.java         | 130 ++++-
 .../page/encoding/ColumnPageEncoder.java        |  54 +-
 .../page/encoding/ColumnPageEncoderMeta.java    |   4 +-
 .../page/encoding/DefaultEncodingFactory.java   |  52 +-
 .../page/encoding/EncodingFactory.java          |   6 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |   2 +-
 .../statistics/PrimitivePageStatsCollector.java |  23 +-
 .../core/datastore/row/ComplexColumnInfo.java   |  57 ++
 .../scan/complextypes/PrimitiveQueryType.java   |   2 +-
 .../core/scan/executor/util/QueryUtil.java      |  22 +
 .../apache/carbondata/core/util/ByteUtil.java   |  10 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   3 +-
 .../carbondata/core/util/DataTypeUtil.java      |  25 +-
 .../page/encoding/TestEncodingFactory.java      |  16 +-
 .../src/test/resources/adap.csv                 |   3 +
 .../src/test/resources/adap_double1.csv         |   3 +
 .../src/test/resources/adap_double2.csv         |   3 +
 .../src/test/resources/adap_double3.csv         |   3 +
 .../src/test/resources/adap_double4.csv         |   3 +
 .../src/test/resources/adap_int1.csv            |   3 +
 .../src/test/resources/adap_int2.csv            |   3 +
 .../src/test/resources/adap_int3.csv            |   3 +
 .../complexType/TestAdaptiveComplexType.scala   | 554 +++++++++++++++++++
 .../TestAdaptiveEncodingForNullValues.scala     | 168 ++++++
 ...codingSafeColumnPageForComplexDataType.scala |  55 ++
 ...dingUnsafeColumnPageForComplexDataType.scala |  59 ++
 .../processing/datatypes/ArrayDataType.java     |  16 +-
 .../processing/datatypes/GenericDataType.java   |   7 +-
 .../processing/datatypes/PrimitiveDataType.java |  34 +-
 .../processing/datatypes/StructDataType.java    |  18 +-
 .../carbondata/processing/store/TablePage.java  |  13 +-
 .../util/CarbonDataProcessorUtil.java           |   8 +-
 42 files changed, 1523 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index eb5917b..32f84f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -223,6 +224,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       switch (encoding) {
         case DIRECT_COMPRESS:
         case DIRECT_STRING:
+        case ADAPTIVE_INTEGRAL:
+        case ADAPTIVE_DELTA_INTEGRAL:
+        case ADAPTIVE_FLOATING:
+        case ADAPTIVE_DELTA_FLOATING:
           return true;
       }
     }
@@ -234,13 +239,30 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       throws IOException, MemoryException {
     if (isEncodedWithMeta(pageMetadata)) {
       ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset);
-      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary());
+      decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
+      return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(),
+          isEncodedWithAdaptiveMeta(pageMetadata));
     } else {
       // following code is for backward compatibility
       return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
     }
   }
 
+  private boolean isEncodedWithAdaptiveMeta(DataChunk2 pageMetadata) {
+    List<Encoding> encodings = pageMetadata.getEncoders();
+    if (encodings != null && encodings.size() == 1) {
+      Encoding encoding = encodings.get(0);
+      switch (encoding) {
+        case ADAPTIVE_INTEGRAL:
+        case ADAPTIVE_DELTA_INTEGRAL:
+        case ADAPTIVE_FLOATING:
+        case ADAPTIVE_DELTA_FLOATING:
+          return true;
+      }
+    }
+    return false;
+  }
+
   private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
       ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
       MemoryException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index 7d59d47..64d2fff 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -17,13 +17,10 @@
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
 import java.io.IOException;
-import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 
 /**
@@ -97,24 +94,6 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
   }
 
   /**
-   * Below method will be used to convert the thrift presence meta to wrapper
-   * presence meta
-   *
-   * @param presentMetadataThrift
-   * @return wrapper presence meta
-   */
-  protected BitSet getNullBitSet(org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream();
-    if (null != present_bit_stream) {
-      return BitSet
-          .valueOf(compressor.unCompressByte(present_bit_stream));
-    } else {
-      return new BitSet(1);
-    }
-  }
-
-  /**
    * Below method will be used to read measure chunk data in group.
    * This method will be useful to avoid multiple IO while reading the
    * data from

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 04d6e2e..9864ab8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 
@@ -125,7 +126,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     copyPoint += measureColumnChunkLength.get(blockIndex);
 
     ColumnPage page = decodeMeasure(measureRawColumnChunk, measureColumnChunk, copyPoint);
-    page.setNullBits(getNullBitSet(measureColumnChunk.presence));
+    page.setNullBits(QueryUtil.getNullBitSet(measureColumnChunk.presence));
     return page;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 8336363..e389ac6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -198,7 +199,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
         measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
         dataChunk3.getPage_offset().get(pageNumber);
     ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
-    decodedPage.setNullBits(getNullBitSet(pageMetadata.presence));
+    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
     return decodedPage;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 2ebaa16..052f745 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
@@ -146,7 +147,7 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
         .readByteBuffer(filePath, offset, pageMetadata.data_page_length);
 
     ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
-    decodedPage.setNullBits(getNullBitSet(pageMetadata.presence));
+    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence));
     return decodedPage;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
index 59e593a..f4712ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/ColumnPageWrapper.java
@@ -17,22 +17,34 @@
 
 package org.apache.carbondata.core.datastore.chunk.store;
 
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+
 public class ColumnPageWrapper implements DimensionColumnPage {
 
   private ColumnPage columnPage;
 
   private CarbonDictionary localDictionary;
 
-  public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary) {
+  private boolean isAdaptiveComplexPrimitivePage;
+
+  public ColumnPageWrapper(ColumnPage columnPage, CarbonDictionary localDictionary,
+      boolean isAdaptiveComplexPrimitivePage) {
     this.columnPage = columnPage;
     this.localDictionary = localDictionary;
+    this.isAdaptiveComplexPrimitivePage = isAdaptiveComplexPrimitivePage;
+
   }
 
   @Override
@@ -58,21 +70,81 @@ public class ColumnPageWrapper implements DimensionColumnPage {
     throw new UnsupportedOperationException("internal error");
   }
 
-  @Override
-  public byte[] getChunkData(int rowId) {
+  @Override public byte[] getChunkData(int rowId) {
+    ColumnType columnType = columnPage.getColumnSpec().getColumnType();
+    DataType srcDataType = columnPage.getColumnSpec().getSchemaDataType();
+    DataType targetDataType = columnPage.getDataType();
     if (null != localDictionary) {
-      return localDictionary.getDictionaryValue(CarbonUtil
-          .getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
+      return localDictionary
+          .getDictionaryValue(CarbonUtil.getSurrogateInternal(columnPage.getBytes(rowId), 0, 3));
+    } else if (columnType == ColumnType.COMPLEX_PRIMITIVE && this.isAdaptiveComplexPrimitive()) {
+      if (columnPage.getNullBits().get(rowId)) {
+        // if this row is null, return default null represent in byte array
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      }
+      if (srcDataType == DataTypes.DOUBLE || srcDataType == DataTypes.FLOAT) {
+        double doubleData = columnPage.getDouble(rowId);
+        if (srcDataType == DataTypes.FLOAT) {
+          float out = (float) doubleData;
+          return ByteUtil.toBytes(out);
+        } else {
+          return ByteUtil.toBytes(doubleData);
+        }
+      } else if (DataTypes.isDecimal(srcDataType)) {
+        throw new RuntimeException("unsupported type: " + srcDataType);
+      } else if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN) || (
+          srcDataType == DataTypes.SHORT) || (srcDataType == DataTypes.SHORT_INT) || (srcDataType
+          == DataTypes.INT) || (srcDataType == DataTypes.LONG) || (srcDataType
+          == DataTypes.TIMESTAMP)) {
+        long longData = columnPage.getLong(rowId);
+        if ((srcDataType == DataTypes.BYTE)) {
+          byte out = (byte) longData;
+          return ByteUtil.toBytes(out);
+        } else if (srcDataType == DataTypes.BOOLEAN) {
+          byte out = (byte) longData;
+          return ByteUtil.toBytes(ByteUtil.toBoolean(out));
+        } else if (srcDataType == DataTypes.SHORT) {
+          short out = (short) longData;
+          return ByteUtil.toBytes(out);
+        } else if (srcDataType == DataTypes.SHORT_INT) {
+          int out = (int) longData;
+          return ByteUtil.toBytes(out);
+        } else if (srcDataType == DataTypes.INT) {
+          int out = (int) longData;
+          return ByteUtil.toBytes(out);
+        } else {
+          // timestamp and long
+          return ByteUtil.toBytes(longData);
+        }
+      } else if ((targetDataType == DataTypes.STRING) || (targetDataType == DataTypes.VARCHAR) || (
+          targetDataType == DataTypes.BYTE_ARRAY)) {
+        return columnPage.getBytes(rowId);
+      } else {
+        throw new RuntimeException("unsupported type: " + targetDataType);
+      }
+    } else if ((columnType == ColumnType.COMPLEX_PRIMITIVE) && !this.isAdaptiveComplexPrimitive()) {
+      if ((srcDataType == DataTypes.BYTE) || (srcDataType == DataTypes.BOOLEAN)) {
+        byte[] out = new byte[1];
+        out[0] = (columnPage.getByte(rowId));
+        return out;
+      } else if (srcDataType == DataTypes.BYTE_ARRAY) {
+        return columnPage.getBytes(rowId);
+      } else {
+        throw new RuntimeException("unsupported type: " + targetDataType);
+      }
+    } else {
+      return columnPage.getBytes(rowId);
     }
-    return columnPage.getBytes(rowId);
   }
 
+
   @Override
   public int getInvertedIndex(int rowId) {
     throw new UnsupportedOperationException("internal error");
   }
 
-  @Override public int getInvertedReverseIndex(int rowId) {
+  @Override
+  public int getInvertedReverseIndex(int rowId) {
     throw new UnsupportedOperationException("internal error");
   }
 
@@ -96,4 +168,8 @@ public class ColumnPageWrapper implements DimensionColumnPage {
 
   }
 
+  public boolean isAdaptiveComplexPrimitive() {
+    return isAdaptiveComplexPrimitivePage;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 6077ddf..8745545 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -58,7 +58,7 @@ public abstract class ColumnPage {
   private final TableSpec.ColumnSpec columnSpec;
 
   // The index of the rowId whose value is null, will be set to 1
-  private BitSet nullBitSet;
+  protected BitSet nullBitSet;
 
   // statistics collector for this column page
   protected ColumnPageStatsCollector statsCollector;
@@ -193,6 +193,8 @@ public abstract class ColumnPage {
           dataType == DataTypes.FLOAT ||
           dataType == DataTypes.DOUBLE) {
         instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+      } else if (dataType == DataTypes.TIMESTAMP) {
+        instance = new UnsafeFixLengthColumnPage(columnSpec, DataTypes.LONG, pageSize);
       } else if (DataTypes.isDecimal(dataType)) {
         instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
       } else if (dataType == DataTypes.STRING
@@ -211,7 +213,7 @@ public abstract class ColumnPage {
         instance = newShortIntPage(columnSpec, new byte[pageSize * 3]);
       } else if (dataType == DataTypes.INT) {
         instance = newIntPage(columnSpec, new int[pageSize]);
-      } else if (dataType == DataTypes.LONG) {
+      } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
         instance = newLongPage(columnSpec, new long[pageSize]);
       } else if (dataType == DataTypes.FLOAT) {
         instance = newFloatPage(columnSpec, new float[pageSize]);
@@ -494,7 +496,7 @@ public abstract class ColumnPage {
   /**
    * Set null at rowId
    */
-  private void putNull(int rowId) {
+  protected void putNull(int rowId) {
     if (dataType == DataTypes.BOOLEAN) {
       putBoolean(rowId, false);
     } else if (dataType == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index a170c8b..7a95d23 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -20,12 +20,15 @@ package org.apache.carbondata.core.datastore.page;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.statistics.DummyStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.row.ComplexColumnInfo;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * holds the complex columndata and its children data
@@ -35,12 +38,12 @@ public class ComplexColumnPage {
   /**
    * number of columns
    */
-  private int depth;
+  private int complexColumnIndex;
 
   /**
    * type of each column
    */
-  private List<ColumnType> complexColumnType;
+  private List<ComplexColumnInfo> complexColumnInfoList;
 
   /**
    * column page for each type
@@ -52,36 +55,41 @@ public class ComplexColumnPage {
    */
   private int[] currentRowIdList;
 
-  public ComplexColumnPage(List<ColumnType> complexColumnType) {
-    this.depth = complexColumnType.size();
-    this.complexColumnType = complexColumnType;
-    this.columnPages = new ColumnPage[this.depth];
-    this.currentRowIdList = new int[depth];
+  public ComplexColumnPage(List<ComplexColumnInfo> complexColumnInfoList) {
+    this.complexColumnIndex = complexColumnInfoList.size();
+    this.complexColumnInfoList = complexColumnInfoList;
+    this.columnPages = new ColumnPage[this.complexColumnIndex];
+    this.currentRowIdList = new int[complexColumnIndex];
   }
 
   /**
    * below method will be used to initlize the column page of complex type
    * @param columnToDictMap
    * dictionary map
-   * @param columnNames
-   * list of columns
    * @param pageSize
    * number of records
    * @throws MemoryException
    * if memory is not sufficient
    */
-  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap,
-      List<String> columnNames, int pageSize) throws MemoryException {
+  public void initialize(Map<String, LocalDictionaryGenerator> columnToDictMap, int pageSize)
+      throws MemoryException {
+    DataType dataType;
     for (int i = 0; i < this.columnPages.length; i++) {
-      LocalDictionaryGenerator localDictionaryGenerator = columnToDictMap.get(columnNames.get(i));
+      LocalDictionaryGenerator localDictionaryGenerator =
+          columnToDictMap.get(complexColumnInfoList.get(i).getColumnNames());
+      TableSpec.ColumnSpec spec = getColumnSpec(i, localDictionaryGenerator);
       if (null == localDictionaryGenerator) {
-        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
-        this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
-        this.columnPages[i].setStatsCollector(new DummyStatsCollector());
+        dataType = complexColumnInfoList.get(i).getColumnDataTypes();
+        if (isColumnPageBasedOnDataType(i)) {
+          // no dictionary primitive types need adaptive encoding,
+          // hence store as actual value instead of byte array
+          this.columnPages[i] = ColumnPage.newPage(spec, dataType, pageSize);
+          this.columnPages[i].setStatsCollector(PrimitivePageStatsCollector.newInstance(dataType));
+        } else {
+          this.columnPages[i] = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+          this.columnPages[i].setStatsCollector(new DummyStatsCollector());
+        }
       } else {
-        TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
-            .newInstance(columnNames.get(i), DataTypes.BYTE_ARRAY, complexColumnType.get(i));
         this.columnPages[i] = ColumnPage
             .newLocalDictPage(spec, DataTypes.BYTE_ARRAY, pageSize, localDictionaryGenerator, true);
         this.columnPages[i].setStatsCollector(new DummyStatsCollector());
@@ -89,57 +97,92 @@ public class ComplexColumnPage {
     }
   }
 
-  /**
-   *
-   * @return depth
-   */
-  public int getDepth() {
-    return depth;
+  private TableSpec.ColumnSpec getColumnSpec(int columnPageIndex,
+      LocalDictionaryGenerator localDictionaryGenerator) {
+    if ((localDictionaryGenerator == null) && isColumnPageBasedOnDataType(columnPageIndex)) {
+      return TableSpec.ColumnSpec
+          .newInstance(complexColumnInfoList.get(columnPageIndex).getColumnNames(),
+              complexColumnInfoList.get(columnPageIndex).getColumnDataTypes(),
+              complexColumnInfoList.get(columnPageIndex).getComplexColumnType());
+    } else {
+      return TableSpec.ColumnSpec
+          .newInstance(complexColumnInfoList.get(columnPageIndex).getColumnNames(),
+              DataTypes.BYTE_ARRAY,
+              complexColumnInfoList.get(columnPageIndex).getComplexColumnType());
+    }
+  }
+
+  private boolean isColumnPageBasedOnDataType(int columnPageIndex) {
+    DataType dataType = complexColumnInfoList.get(columnPageIndex).getColumnDataTypes();
+    if ((complexColumnInfoList.get(columnPageIndex).isNoDictionary() &&
+        !((DataTypes.isStructType(dataType) ||
+            DataTypes.isArrayType(dataType) ||
+            (dataType == DataTypes.STRING) ||
+            (dataType == DataTypes.VARCHAR) ||
+            (dataType == DataTypes.DATE) ||
+            DataTypes.isDecimal(dataType))))) {
+      // For all these above condition the ColumnPage should be Taken as BYTE_ARRAY
+      // for all other cases make Column Page Based on each DataType.
+      return true;
+    } else {
+      return false;
+    }
   }
 
   /**
-   * return the type of complex column
-   * @param isDepth
-   * @return co plex column type
+   *
+   * @return complexColumnIndex
    */
-  public ColumnType getComplexColumnType(int isDepth) {
-    return complexColumnType.get(isDepth);
+  public int getComplexColumnIndex() {
+    return complexColumnIndex;
   }
 
   /**
    * method to add complex column data
    * @param depth
-   * depth of column
+   * complexColumnIndex of column
    * @param dataList
    * dataList
    */
   public void putComplexData(int depth, List<byte[]> dataList) {
-    assert (depth <= this.depth);
-    int currentNumber = currentRowIdList[depth];
-    for (int i = 0; i < dataList.size(); i++) {
-      columnPages[depth].putData(currentNumber, dataList.get(i));
-      currentNumber++;
+    assert (depth <= this.complexColumnIndex);
+    int positionNumber = currentRowIdList[depth];
+    for (byte[] value : dataList) {
+      if (columnPages[depth].getDataType() != DataTypes.BYTE_ARRAY) {
+        if ((value == null) || (value.length == 0)) {
+          columnPages[depth].putNull(positionNumber);
+          columnPages[depth].statsCollector.updateNull(positionNumber);
+          columnPages[depth].nullBitSet.set(positionNumber);
+        } else {
+          columnPages[depth].putData(positionNumber, DataTypeUtil
+              .getDataBasedOnDataTypeForNoDictionaryColumn(value,
+                  columnPages[depth].getColumnSpec().getSchemaDataType(), false));
+        }
+      } else {
+        columnPages[depth].putData(positionNumber, value);
+      }
+      positionNumber++;
     }
-    currentRowIdList[depth] = currentNumber;
+    currentRowIdList[depth] = positionNumber;
   }
 
   /**
    * to free the used memory
    */
   public void freeMemory() {
-    for (int i = 0; i < depth; i++) {
+    for (int i = 0; i < complexColumnIndex; i++) {
       columnPages[i].freeMemory();
     }
   }
 
   /**
    * return the column page
-   * @param depth
-   * depth of column
+   * @param complexColumnIndex
+   * complexColumnIndex of column
    * @return colum page
    */
-  public ColumnPage getColumnPage(int depth) {
-    assert (depth <= this.depth);
-    return columnPages[depth];
+  public ColumnPage getColumnPage(int complexColumnIndex) {
+    assert (complexColumnIndex <= this.complexColumnIndex);
+    return columnPages[complexColumnIndex];
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
index 32846a1..e16eb93 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FallbackColumnPageEncoder.java
@@ -59,9 +59,11 @@ public class FallbackColumnPageEncoder implements Callable<FallbackEncodedColumn
     TableSpec.ColumnSpec columnSpec = encodedColumnPage.getActualPage().getColumnSpec();
     switch (columnSpec.getColumnType()) {
       case COMPLEX_ARRAY:
-      case COMPLEX_PRIMITIVE:
       case COMPLEX_STRUCT:
       case COMPLEX:
+        throw new RuntimeException("Unsupported DataType. Only COMPLEX_PRIMITIVE should come");
+
+      case COMPLEX_PRIMITIVE:
         // for complex type column
         newEncodedColumnPage = ColumnPageEncoder.encodedColumn(
             encodedColumnPage.getActualPage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 2304614..8f0d934 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -40,9 +40,12 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   private byte[] shortIntData;
   private byte[][] fixedLengthdata;
 
+  // total number of entries in array
+  private int arrayElementCount = 0;
 
   SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
+    this.fixedLengthdata = new byte[pageSize][];
   }
 
   SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
@@ -50,13 +53,14 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     super(columnSpec, dataType, pageSize);
     this.fixedLengthdata = new byte[pageSize][];
   }
-
   /**
    * Set byte value at rowId
    */
   @Override
   public void putByte(int rowId, byte value) {
+    ensureArraySize(rowId, DataTypes.BYTE);
     byteData[rowId] = value;
+    arrayElementCount++;
   }
 
   /**
@@ -64,7 +68,9 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void putShort(int rowId, short value) {
+    ensureArraySize(rowId, DataTypes.SHORT);
     shortData[rowId] = value;
+    arrayElementCount++;
   }
 
   /**
@@ -72,7 +78,9 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void putInt(int rowId, int value) {
+    ensureArraySize(rowId, DataTypes.INT);
     intData[rowId] = value;
+    arrayElementCount++;
   }
 
   /**
@@ -80,7 +88,9 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void putLong(int rowId, long value) {
+    ensureArraySize(rowId, DataTypes.LONG);
     longData[rowId] = value;
+    arrayElementCount++;
   }
 
   /**
@@ -88,7 +98,9 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void putDouble(int rowId, double value) {
+    ensureArraySize(rowId, DataTypes.DOUBLE);
     doubleData[rowId] = value;
+    arrayElementCount++;
   }
 
   /**
@@ -101,8 +113,10 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putShortInt(int rowId, int value) {
+    ensureArraySize(rowId, DataTypes.SHORT_INT);
     byte[] converted = ByteUtil.to3Bytes(value);
     System.arraycopy(converted, 0, shortIntData, rowId * 3, 3);
+    arrayElementCount++;
   }
 
   @Override
@@ -291,8 +305,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   /**
    * Set int values to page
    */
-  @Override
-  public void setIntPage(int[] intData) {
+  @Override public void setIntPage(int[] intData) {
     this.intData = intData;
   }
 
@@ -346,27 +359,27 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
     if (dataType == DataTypes.BYTE) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, byteData[i]);
       }
     } else if (dataType == DataTypes.SHORT) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, shortData[i]);
       }
     } else if (dataType == DataTypes.INT) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, intData[i]);
       }
     } else if (dataType == DataTypes.LONG) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, longData[i]);
       }
     } else if (dataType == DataTypes.FLOAT) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, floatData[i]);
       }
     } else if (dataType == DataTypes.DOUBLE) {
-      for (int i = 0; i < pageSize; i++) {
+      for (int i = 0; i < arrayElementCount; i++) {
         codec.encode(i, doubleData[i]);
       }
     } else {
@@ -375,4 +388,52 @@ public class SafeFixLengthColumnPage extends ColumnPage {
     }
   }
 
+  protected void ensureArraySize(int requestSize, DataType dataType) {
+    if (dataType == DataTypes.BYTE) {
+      if (requestSize >= byteData.length) {
+        byte[] newArray = new byte[arrayElementCount + 16];
+        System.arraycopy(byteData, 0, newArray, 0, arrayElementCount);
+        byteData = newArray;
+      }
+    } else if (dataType == DataTypes.SHORT) {
+      if (requestSize >= shortData.length) {
+        short[] newArray = new short[arrayElementCount + 16];
+        System.arraycopy(shortData, 0, newArray, 0, arrayElementCount);
+        shortData = newArray;
+      }
+    } else if (dataType == DataTypes.SHORT_INT) {
+      if (requestSize >= shortIntData.length / 3) {
+        byte[] newArray = new byte[(arrayElementCount * 3) + (16 * 3)];
+        System.arraycopy(shortIntData, 0, newArray, 0, arrayElementCount * 3);
+        shortIntData = newArray;
+      }
+    } else if (dataType == DataTypes.INT) {
+      if (requestSize >= intData.length) {
+        int[] newArray = new int[arrayElementCount + 16];
+        System.arraycopy(intData, 0, newArray, 0, arrayElementCount);
+        intData = newArray;
+      }
+    } else if (dataType == DataTypes.LONG) {
+      if (requestSize >= longData.length) {
+        long[] newArray = new long[arrayElementCount + 16];
+        System.arraycopy(longData, 0, newArray, 0, arrayElementCount);
+        longData = newArray;
+      }
+    } else if (dataType == DataTypes.FLOAT) {
+      if (requestSize >= floatData.length) {
+        float[] newArray = new float[arrayElementCount + 16];
+        System.arraycopy(floatData, 0, newArray, 0, arrayElementCount);
+        floatData = newArray;
+      }
+    } else if (dataType == DataTypes.DOUBLE) {
+      if (requestSize >= doubleData.length) {
+        double[] newArray = new double[arrayElementCount + 16];
+        System.arraycopy(doubleData, 0, newArray, 0, arrayElementCount);
+        doubleData = newArray;
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          "not support value conversion on " + dataType + " page");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 7965e93..a4cea5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -46,6 +46,12 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   private int eachRowSize;
 
+  // the length of the bytes added in the page
+  private int totalLength;
+
+  // size of the allocated memory, in bytes
+  private int capacity;
+
   private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
 
   private static final int byteBits = DataTypes.BYTE.getSizeBits();
@@ -69,14 +75,17 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
+      capacity = size;
     } else if (dataType == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
+      capacity = size;
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) {
       throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
+    totalLength = 0;
   }
 
   UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
@@ -84,6 +93,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       throws MemoryException {
     this(columnSpec, dataType, pageSize);
     this.eachRowSize = eachRowSize;
+    totalLength = 0;
     if (dataType == DataTypes.BYTE_ARRAY) {
       memoryBlock =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
@@ -92,43 +102,89 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     }
   }
 
+  private void checkDataFileSize() {
+    // 16 is a Watermark in order to stop from overflowing.
+    if (totalLength > (Integer.MAX_VALUE - 16)) {
+      // since we later store a column page in a byte array, so its maximum size is 2GB
+      throw new RuntimeException("Carbondata only support maximum 2GB size for one column page");
+    }
+  }
+
   @Override
   public void putByte(int rowId, byte value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_BYTE);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     long offset = ((long)rowId) << byteBits;
     CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_BYTE;
   }
 
+
+
   @Override
   public void putShort(int rowId, short value) {
+    try {
+      ensureMemory(shortBits);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     long offset = ((long)rowId) << shortBits;
     CarbonUnsafe.getUnsafe().putShort(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_SHORT;
   }
 
   @Override
   public void putShortInt(int rowId, int value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_SHORT_INT);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     byte[] data = ByteUtil.to3Bytes(value);
     long offset = rowId * 3L;
     CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset, data[0]);
     CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 1, data[1]);
     CarbonUnsafe.getUnsafe().putByte(baseAddress, baseOffset + offset + 2, data[2]);
+    totalLength += ByteUtil.SIZEOF_SHORT_INT;
   }
 
   @Override
   public void putInt(int rowId, int value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_INT);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     long offset = ((long)rowId) << intBits;
     CarbonUnsafe.getUnsafe().putInt(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_INT;
   }
 
   @Override
   public void putLong(int rowId, long value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_LONG);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     long offset = ((long)rowId) << longBits;
     CarbonUnsafe.getUnsafe().putLong(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_LONG;
   }
 
   @Override
   public void putDouble(int rowId, double value) {
+    try {
+      ensureMemory(ByteUtil.SIZEOF_DOUBLE);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     long offset = ((long)rowId) << doubleBits;
     CarbonUnsafe.getUnsafe().putDouble(baseAddress, baseOffset + offset, value);
+    totalLength += ByteUtil.SIZEOF_DOUBLE;
   }
 
   @Override
@@ -307,42 +363,49 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   public void setBytePage(byte[] byteData) {
     CarbonUnsafe.getUnsafe().copyMemory(byteData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
         baseAddress, baseOffset, byteData.length << byteBits);
+    capacity = byteData.length;
   }
 
   @Override
   public void setShortPage(short[] shortData) {
     CarbonUnsafe.getUnsafe().copyMemory(shortData, CarbonUnsafe.SHORT_ARRAY_OFFSET,
         baseAddress, baseOffset, shortData.length << shortBits);
+    capacity = shortData.length;
   }
 
   @Override
   public void setShortIntPage(byte[] shortIntData) {
     CarbonUnsafe.getUnsafe().copyMemory(shortIntData, CarbonUnsafe.BYTE_ARRAY_OFFSET,
         baseAddress, baseOffset, shortIntData.length);
+    capacity = shortIntData.length;
   }
 
   @Override
   public void setIntPage(int[] intData) {
     CarbonUnsafe.getUnsafe().copyMemory(intData, CarbonUnsafe.INT_ARRAY_OFFSET,
         baseAddress, baseOffset, intData.length << intBits);
+    capacity = intData.length;
   }
 
   @Override
   public void setLongPage(long[] longData) {
     CarbonUnsafe.getUnsafe().copyMemory(longData, CarbonUnsafe.LONG_ARRAY_OFFSET,
         baseAddress, baseOffset, longData.length << longBits);
+    capacity = longData.length;
   }
 
   @Override
   public void setFloatPage(float[] floatData) {
     CarbonUnsafe.getUnsafe().copyMemory(floatData, CarbonUnsafe.FLOAT_ARRAY_OFFSET,
         baseAddress, baseOffset, floatData.length << floatBits);
+    capacity = floatData.length;
   }
 
   @Override
   public void setDoublePage(double[] doubleData) {
     CarbonUnsafe.getUnsafe().copyMemory(doubleData, CarbonUnsafe.DOUBLE_ARRAY_OFFSET,
         baseAddress, baseOffset, doubleData.length << doubleBits);
+    capacity = doubleData.length;
   }
 
   @Override
@@ -359,48 +422,65 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     }
   }
 
-  @Override
-  public void convertValue(ColumnPageValueConverter codec) {
-    int pageSize = getPageSize();
+  @Override public void convertValue(ColumnPageValueConverter codec) {
+    int endLoop = getEndLoop();
     if (dataType == DataTypes.BYTE) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << byteBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
       }
     } else if (dataType == DataTypes.SHORT) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << shortBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
       }
     } else if (dataType == DataTypes.INT) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << intBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
       }
     } else if (dataType == DataTypes.LONG) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << longBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
       }
     } else if (dataType == DataTypes.FLOAT) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << floatBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
       }
     } else if (dataType == DataTypes.DOUBLE) {
-      for (long i = 0; i < pageSize; i++) {
+      for (long i = 0; i < endLoop; i++) {
         long offset = i << doubleBits;
-        codec.encode((int)i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
+        codec.encode((int) i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
       }
     } else {
       throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
   }
 
+  private int getEndLoop() {
+    if (dataType == DataTypes.BYTE) {
+      return totalLength / ByteUtil.SIZEOF_BYTE;
+    } else if (dataType == DataTypes.SHORT) {
+      return totalLength / ByteUtil.SIZEOF_SHORT;
+    } else if (dataType == DataTypes.INT) {
+      return totalLength / ByteUtil.SIZEOF_INT;
+    } else if (dataType == DataTypes.LONG) {
+      return totalLength / ByteUtil.SIZEOF_LONG;
+    } else if (dataType == DataTypes.FLOAT) {
+      return totalLength / DataTypes.FLOAT.getSizeInBytes();
+    } else if (dataType == DataTypes.DOUBLE) {
+      return totalLength / DataTypes.DOUBLE.getSizeInBytes();
+    } else {
+      throw new UnsupportedOperationException("invalid data type: " + dataType);
+    }
+  }
+
   @Override public byte[] compress(Compressor compressor) throws MemoryException, IOException {
     if (UnsafeMemoryManager.isOffHeap()) {
       // use raw compression and copy to byte[]
-      int inputSize = pageSize * dataType.getSizeInBytes();
+      int inputSize = totalLength;
       int compressedMaxSize = compressor.maxCompressedLength(inputSize);
       MemoryBlock compressed =
           UnsafeMemoryManager.allocateMemoryWithRetry(taskId, compressedMaxSize);
@@ -416,4 +496,22 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       return super.compress(compressor);
     }
   }
+
+  /**
+   * reallocate memory if capacity length than current size + request size
+   */
+  protected void ensureMemory(int requestSize) throws MemoryException {
+    checkDataFileSize();
+    if (totalLength + requestSize > capacity) {
+      int newSize = Math.max(2 * capacity, totalLength + requestSize);
+      MemoryBlock newBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, newSize);
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset,
+          newBlock.getBaseObject(), newBlock.getBaseOffset(), totalLength);
+      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+      memoryBlock = newBlock;
+      baseAddress = newBlock.getBaseObject();
+      baseOffset = newBlock.getBaseOffset();
+      capacity = newSize;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index f53024a..b5a63f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -24,6 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -40,8 +44,17 @@ import org.apache.carbondata.format.LocalDictionaryChunk;
 import org.apache.carbondata.format.LocalDictionaryChunkMeta;
 import org.apache.carbondata.format.PresenceMeta;
 
+import static org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory.selectCodecByAlgorithmForFloating;
+import static org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory.selectCodecByAlgorithmForIntegral;
+
 public abstract class ColumnPageEncoder {
 
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ColumnPageEncoder.class.getName());
+
   protected abstract byte[] encodeData(ColumnPage input) throws MemoryException, IOException;
 
   protected abstract List<Encoding> getEncodingList();
@@ -135,9 +148,9 @@ public abstract class ColumnPageEncoder {
    */
   public static EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input)
       throws IOException, MemoryException {
-    EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getDepth()];
+    EncodedColumnPage[] encodedPages = new EncodedColumnPage[input.getComplexColumnIndex()];
     int index = 0;
-    while (index < input.getDepth()) {
+    while (index < input.getComplexColumnIndex()) {
       ColumnPage subColumnPage = input.getColumnPage(index);
       encodedPages[index] = encodedColumn(subColumnPage);
       index++;
@@ -147,10 +160,43 @@ public abstract class ColumnPageEncoder {
 
   public static EncodedColumnPage encodedColumn(ColumnPage page)
       throws IOException, MemoryException {
-    ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
-    return encoder.encode(page);
+    ColumnPageEncoder pageEncoder = createCodecForDimension(page);
+    if (pageEncoder == null) {
+      ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
+      return encoder.encode(page);
+    } else {
+      LOGGER.debug("Encoder result ---> Source data type: " + pageEncoder.getEncoderMeta(page)
+          .getColumnSpec().getSchemaDataType() + " Destination data type: " + pageEncoder
+          .getEncoderMeta(page).getStoreDataType() + " for the column: " + pageEncoder
+          .getEncoderMeta(page).getColumnSpec().getFieldName());
+
+      return pageEncoder.encode(page);
+    }
   }
 
+  private static ColumnPageEncoder createCodecForDimension(ColumnPage inputPage) {
+    TableSpec.ColumnSpec columnSpec = inputPage.getColumnSpec();
+    if (columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+      if (inputPage.getDataType() == DataTypes.BYTE_ARRAY
+          || inputPage.getDataType() == DataTypes.STRING) {
+        // use legacy encoder
+        return null;
+      } else if ((inputPage.getDataType() == DataTypes.BYTE) || (inputPage.getDataType()
+          == DataTypes.SHORT) || (inputPage.getDataType() == DataTypes.INT) || (
+          inputPage.getDataType() == DataTypes.LONG)) {
+        return selectCodecByAlgorithmForIntegral(inputPage.getStatistics(), true)
+            .createEncoder(null);
+      } else if ((inputPage.getDataType() == DataTypes.FLOAT) || (inputPage.getDataType()
+          == DataTypes.DOUBLE)) {
+        return selectCodecByAlgorithmForFloating(inputPage.getStatistics(), true)
+            .createEncoder(null);
+      }
+    }
+    // use legacy encoder
+    return null;
+  }
+
+
   /**
    * Below method to encode the dictionary page
    * @param dictionaryPage

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 659feb0..4e04186 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -118,7 +118,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       out.writeInt((int) getMaxValue());
       out.writeInt((int) getMinValue());
       out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       out.writeLong((Long) getMaxValue());
       out.writeLong((Long) getMinValue());
       out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
@@ -167,7 +167,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       this.setMaxValue(in.readInt());
       this.setMinValue(in.readInt());
       in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       this.setMaxValue(in.readLong());
       this.setMinValue(in.readLong());
       in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 816b01f..dc79b13 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.Compl
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
 import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -120,12 +121,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
         dataType == DataTypes.SHORT ||
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
-      return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+      return selectCodecByAlgorithmForIntegral(stats, false).createEncoder(null);
     } else if (DataTypes.isDecimal(dataType)) {
       return createEncoderForDecimalDataTypeMeasure(columnPage);
-    } else if (dataType == DataTypes.FLOAT ||
-        dataType == DataTypes.DOUBLE) {
-      return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
+    } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
+      return selectCodecByAlgorithmForFloating(stats, false).createEncoder(null);
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
     } else {
@@ -161,13 +161,13 @@ public class DefaultEncodingFactory extends EncodingFactory {
   }
 
   private static DataType fitMinMax(DataType dataType, Object max, Object min) {
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
       return fitLongMinMax((byte) max, (byte) min);
     } else if (dataType == DataTypes.SHORT) {
       return fitLongMinMax((short) max, (short) min);
     } else if (dataType == DataTypes.INT) {
       return fitLongMinMax((int) max, (int) min);
-    } else if (dataType == DataTypes.LONG) {
+    } else if ((dataType == DataTypes.LONG) || (dataType == DataTypes.TIMESTAMP)) {
       return fitLongMinMax((long) max, (long) min);
     } else if (dataType == DataTypes.DOUBLE) {
       return fitLongMinMax((long) (double) max, (long) (double) min);
@@ -209,13 +209,13 @@ public class DefaultEncodingFactory extends EncodingFactory {
   private static DataType fitDelta(DataType dataType, Object max, Object min) {
     // use long data type to calculate delta to avoid overflow
     long value;
-    if (dataType == DataTypes.BYTE) {
+    if (dataType == DataTypes.BYTE || dataType == DataTypes.BOOLEAN) {
       value = (long) (byte) max - (long) (byte) min;
     } else if (dataType == DataTypes.SHORT) {
       value = (long) (short) max - (long) (short) min;
     } else if (dataType == DataTypes.INT) {
       value = (long) (int) max - (long) (int) min;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       value = (long) max - (long) min;
       // The subtraction overflowed iff the operands have opposing signs
       // and the result's sign differs from the minuend.
@@ -249,7 +249,8 @@ public class DefaultEncodingFactory extends EncodingFactory {
    * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
    * size is smaller
    */
-  static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) {
+  static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats,
+      boolean isComplexPrimitive) {
     DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
     DataType deltaDataType;
@@ -259,12 +260,15 @@ public class DefaultEncodingFactory extends EncodingFactory {
     } else {
       deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
     }
-    // in case of decimal data type check if the decimal converter type is Int or Long and based on
-    // that get size in bytes
-    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
-        .getSizeInBytes()) {
-      // no effect to use adaptive or delta, use compression only
-      return new DirectCompressCodec(stats.getDataType());
+    // for complex primitive, if source and destination data type is same, use adaptive encoding.
+    if (!isComplexPrimitive) {
+      // in case of decimal datatype, check if the decimal converter type is Int or Long and based
+      // on that get size in bytes
+      if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) == srcDataType
+          .getSizeInBytes()) {
+        // no effect to use adaptive or delta, use compression only
+        return new DirectCompressCodec(stats.getDataType());
+      }
     }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
@@ -277,19 +281,27 @@ public class DefaultEncodingFactory extends EncodingFactory {
 
   // choose between upscale adaptive encoder or upscale delta adaptive encoder,
   // based on whose target data type size is smaller
-  static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) {
+  static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats,
+      boolean isComplexPrimitive) {
     DataType srcDataType = stats.getDataType();
     double maxValue = (double) stats.getMax();
     double minValue = (double) stats.getMin();
     int decimalCount = stats.getDecimalCount();
 
+    // For Complex Type primitive we should always choose adaptive path
+    // as LV format will be reduced to only V format. Therefore inorder
+    // to do that decimal count should be actual count instead of -1.
+    if (isComplexPrimitive && decimalCount == -1 && stats instanceof PrimitivePageStatsCollector) {
+      decimalCount = ((PrimitivePageStatsCollector)stats).getDecimalForComplexPrimitive();
+    }
+
     //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
     //but we can't use -1 to getDatatype, we should use -10000000.
     double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
     if (decimalCount == 0) {
       // short, int, long
-      return selectCodecByAlgorithmForIntegral(stats);
-    } else if (decimalCount < 0) {
+      return selectCodecByAlgorithmForIntegral(stats, false);
+    } else if (decimalCount < 0 && !isComplexPrimitive) {
       return new DirectCompressCodec(DataTypes.DOUBLE);
     } else {
       // double
@@ -299,7 +311,9 @@ public class DefaultEncodingFactory extends EncodingFactory {
           (long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
       if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
         return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats);
-      } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes()) {
+      } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes() || (
+          (isComplexPrimitive) && (adaptiveDataType.getSizeInBytes() == DataTypes.DOUBLE
+              .getSizeInBytes()))) {
         return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
       } else {
         return new DirectCompressCodec(DataTypes.DOUBLE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index a661a49..8bc67c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -132,7 +132,8 @@ public abstract class EncodingFactory {
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
       // create the codec based on algorithm and create decoder by recovering the metadata
-      ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats);
+      ColumnPageCodec codec =
+          DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats, false);
       if (codec instanceof AdaptiveIntegralCodec) {
         AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
@@ -153,7 +154,8 @@ public abstract class EncodingFactory {
       }
     } else if (dataType == DataTypes.FLOAT || dataType == DataTypes.DOUBLE) {
       // create the codec based on algorithm and create decoder by recovering the metadata
-      ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats);
+      ColumnPageCodec codec =
+          DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats, false);
       if (codec instanceof AdaptiveFloatingCodec) {
         AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
         ColumnPageEncoderMeta meta =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index a543f7e..e98397d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -57,7 +57,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       this.max = (short) stats.getMax();
     } else if (srcDataType == DataTypes.INT) {
       this.max = (int) stats.getMax();
-    } else if (srcDataType == DataTypes.LONG) {
+    } else if (srcDataType == DataTypes.LONG || srcDataType == DataTypes.TIMESTAMP) {
       this.max = (long) stats.getMax();
     } else if (srcDataType == DataTypes.DOUBLE) {
       this.max = (long) (double) stats.getMax();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 76cb002..bbac772 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -41,6 +41,9 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   // scale of the double value, apply adaptive encoding if this is positive
   private int decimal;
 
+  // scale of the double value, only for complex primitive.
+  private int decimalCountForComplexPrimitive;
+
   private boolean isFirst = true;
   private BigDecimal zeroDecimal;
 
@@ -64,7 +67,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     } else if (dataType == DataTypes.INT) {
       instance.minInt = (int) meta.getMinValue();
       instance.maxInt = (int) meta.getMaxValue();
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       instance.minLong = (long) meta.getMinValue();
       instance.maxLong = (long) meta.getMaxValue();
     } else if (dataType == DataTypes.DOUBLE) {
@@ -96,7 +99,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     } else if (dataType == DataTypes.INT) {
       instance.minInt = (int) meta.getMinValue();
       instance.maxInt = (int) meta.getMaxValue();
-    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG
+        || dataType == DataTypes.TIMESTAMP) {
       instance.minLong = (long) meta.getMinValue();
       instance.maxLong = (long) meta.getMaxValue();
     } else if (dataType == DataTypes.DOUBLE) {
@@ -128,7 +132,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     } else if (dataType == DataTypes.INT) {
       minInt = Integer.MAX_VALUE;
       maxInt = Integer.MIN_VALUE;
-    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LEGACY_LONG || dataType == DataTypes.LONG
+        || dataType == DataTypes.TIMESTAMP) {
       minLong = Long.MAX_VALUE;
       maxLong = Long.MIN_VALUE;
     } else if (dataType == DataTypes.DOUBLE) {
@@ -153,7 +158,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       update((short) value);
     } else if (dataType == DataTypes.INT) {
       update((int) value);
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       update(value);
     } else if (dataType == DataTypes.DOUBLE) {
       update(0d);
@@ -236,6 +241,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     }
     if (decimal >= 0) {
       int decimalCount = getDecimalCount(value);
+      decimalCountForComplexPrimitive = decimalCount;
       if (decimalCount > 5) {
         // If deciaml count is too big, we do not do adaptive encoding.
         // So set decimal to negative value
@@ -246,6 +252,11 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     }
   }
 
+  public int getDecimalForComplexPrimitive() {
+    decimal = decimalCountForComplexPrimitive;
+    return decimalCountForComplexPrimitive;
+  }
+
   @Override
   public void update(BigDecimal decimalValue) {
     if (isFirst) {
@@ -294,7 +305,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return minShort;
     } else if (dataType == DataTypes.INT) {
       return minInt;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       return minLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return minDouble;
@@ -312,7 +323,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return maxShort;
     } else if (dataType == DataTypes.INT) {
       return maxInt;
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       return maxLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return maxDouble;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java
new file mode 100644
index 0000000..e8d7050
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/ComplexColumnInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.row;
+
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Wrapper object to hold the complex column details
+ */
+public class ComplexColumnInfo {
+  private ColumnType complexColumnType;
+  private DataType columnDataTypes;
+  private String columnNames;
+  private boolean isNoDictionary;
+
+  public ComplexColumnInfo(ColumnType complexColumnType, DataType columnDataTypes,
+      String columnNames, boolean isNoDictionary) {
+    this.complexColumnType = complexColumnType;
+    this.columnDataTypes = columnDataTypes;
+    this.columnNames = columnNames;
+    this.isNoDictionary = isNoDictionary;
+  }
+
+  public ColumnType getComplexColumnType() {
+    return complexColumnType;
+  }
+
+  public DataType getColumnDataTypes() {
+    return columnDataTypes;
+  }
+
+  public String getColumnNames() {
+    return columnNames;
+  }
+
+  public boolean isNoDictionary() {
+    return isNoDictionary;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 948b765..b3f13d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -63,7 +63,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     this.name = name;
     this.parentname = parentname;
     this.isDirectDictionary = isDirectDictionary;
-    this.isDictionary = (dictionary != null && isDirectDictionary == false);
+    this.isDictionary = (dictionary != null && !isDirectDictionary);
     this.directDictGenForDate =
         DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(DataTypes.DATE);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index c8b0f6e..2e87051 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.executor.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -37,6 +38,8 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -873,4 +876,23 @@ public class QueryUtil {
       }
     }
   }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  public static BitSet getNullBitSet(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    final byte[] present_bit_stream = presentMetadataThrift.getPresent_bit_stream();
+    if (null != present_bit_stream) {
+      return BitSet
+          .valueOf(compressor.unCompressByte(present_bit_stream));
+    } else {
+      return new BitSet(1);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 1df60c1..322c80a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -29,11 +29,17 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
  */
 public final class ByteUtil {
 
-  public static final int SIZEOF_LONG = 8;
+  public static final int SIZEOF_BYTE = 1;
+
+  public static final int SIZEOF_SHORT = 2;
+
+  public static final int SIZEOF_SHORT_INT = 3;
 
   public static final int SIZEOF_INT = 4;
 
-  public static final int SIZEOF_SHORT = 2;
+  public static final int SIZEOF_LONG = 8;
+
+  public static final int SIZEOF_DOUBLE = 8;
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 789897e..dd34bc6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2471,7 +2471,7 @@ public final class CarbonUtil {
       b.putLong((int) value);
       b.flip();
       return b.array();
-    } else if (dataType == DataTypes.LONG) {
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
       b = ByteBuffer.allocate(8);
       b.putLong((long) value);
       b.flip();
@@ -2486,7 +2486,6 @@ public final class CarbonUtil {
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return (byte[]) value;
     } else if (dataType == DataTypes.STRING
-        || dataType == DataTypes.TIMESTAMP
         || dataType == DataTypes.DATE
         || dataType == DataTypes.VARCHAR) {
       return (byte[]) value;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 0306c02..54b7441 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -430,15 +430,28 @@ public final class DataTypeUtil {
   }
 
   /**
+   * Wrapper for actual getDataBasedOnDataTypeForNoDictionaryColumn.
+   *
+   * @param dataInBytes
+   * @param actualDataType
+   * @return
+   */
+  public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBytes,
+      DataType actualDataType) {
+    return getDataBasedOnDataTypeForNoDictionaryColumn(dataInBytes, actualDataType, true);
+  }
+
+  /**
    * Below method will be used to convert the data passed to its actual data
    * type
    *
-   * @param dataInBytes    data
-   * @param actualDataType actual data type
+   * @param dataInBytes           data
+   * @param actualDataType        actual data type
+   * @param isTimeStampConversion
    * @return actual data after conversion
    */
   public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] dataInBytes,
-      DataType actualDataType) {
+      DataType actualDataType, boolean isTimeStampConversion) {
     if (null == dataInBytes || Arrays
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
       return null;
@@ -467,7 +480,11 @@ public final class DataTypeUtil {
         if (isEmptyByteArray(dataInBytes)) {
           return null;
         }
-        return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length) * 1000L;
+        if (isTimeStampConversion) {
+          return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length) * 1000L;
+        } else {
+          return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length);
+        }
       } else if (actualDataType == DataTypes.DOUBLE) {
         if (isEmptyByteArray(dataInBytes)) {
           return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/438b4421/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
index 52a4de3..0b8bcc7 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/TestEncodingFactory.java
@@ -38,25 +38,25 @@ public class TestEncodingFactory extends TestCase {
     // for Byte
     primitivePageStatsCollector.update((long) Byte.MAX_VALUE);
     ColumnPageCodec columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.BYTE == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for Short
     primitivePageStatsCollector.update((long) Short.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.SHORT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update((long) Integer.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for long
     primitivePageStatsCollector.update(Long.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof DirectCompressCodec);
     assert ("DirectCompressCodec".equals(columnPageCodec.getName()));
   }
@@ -67,25 +67,25 @@ public class TestEncodingFactory extends TestCase {
     // for Byte
     primitivePageStatsCollector.update((long) 200);
     ColumnPageCodec columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveDeltaIntegralCodec);
     assert (DataTypes.BYTE == ((AdaptiveDeltaIntegralCodec) columnPageCodec).getTargetDataType());
     // for Short
     primitivePageStatsCollector.update((long) 634767);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.SHORT_INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update((long) (Integer.MAX_VALUE + 200));
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof AdaptiveIntegralCodec);
     assert (DataTypes.INT == ((AdaptiveIntegralCodec) columnPageCodec).getTargetDataType());
     // for int
     primitivePageStatsCollector.update(Long.MAX_VALUE);
     columnPageCodec =
-        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector);
+        DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(primitivePageStatsCollector, false);
     assert (columnPageCodec instanceof DirectCompressCodec);
     assert ("DirectCompressCodec".equals(columnPageCodec.getName()));
   }


Mime
View raw message