carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/5] carbondata git commit: [CARBONDATA-1098] Change page statistics use exact type and use column page in writer
Date Thu, 20 Jul 2017 11:12:01 GMT
Repository: carbondata
Updated Branches:
  refs/heads/encoding_override 9e4da2a6c -> fecafde85


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index fab1a39..0f1b52b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -24,17 +24,17 @@ import java.nio.channels.FileChannel;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.TablePageKey;
-import org.apache.carbondata.processing.store.TablePageStatistics;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -47,12 +47,11 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     super(dataWriterVo);
   }
 
-  @Override
-  public NodeHolder buildDataNodeHolder(EncodedData encoded,
-      TablePageStatistics stats, TablePageKey key)
+  protected NodeHolder buildNodeHolder(EncodedTablePage encodedTablePage)
       throws CarbonDataWriterException {
     // if there are no NO-Dictionary column present in the table then
     // set the empty byte array
+    TablePageKey key = encodedTablePage.getPageKey();
     byte[] startKey = key.getStartKey();
     byte[] endKey = key.getEndKey();
     byte[] noDictionaryStartKey = key.getNoDictStartKey();
@@ -70,46 +69,48 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     int totalKeySize = 0;
     int keyBlockSize = 0;
 
-    IndexStorage[] keyStorageArray = encoded.indexStorages;
-    boolean[] isSortedData = new boolean[keyStorageArray.length];
-    int[] keyLengths = new int[keyStorageArray.length];
-    byte[][] allMinValue = new byte[keyStorageArray.length][];
-    byte[][] allMaxValue = new byte[keyStorageArray.length][];
-    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
-    byte[][] keyBlockData = encoded.dimensions;
-    byte[][] measureArray = encoded.measures;
+    int numDimensions = encodedTablePage.getNumDimensions();
+    boolean[] isSortedData = new boolean[numDimensions];
+    int[] keyLengths = new int[numDimensions];
+    int[] keyBlockIdxLengths = new int[numDimensions];
+    byte[][] allMinValue = new byte[numDimensions][];
+    byte[][] allMaxValue = new byte[numDimensions][];
+    byte[][] keyBlockData = NodeHolder.getKeyArray(encodedTablePage);
+    byte[][] measureArray = NodeHolder.getDataArray(encodedTablePage);
+    TablePageStatistics stats = new TablePageStatistics(encodedTablePage.getDimensions(),
+        encodedTablePage.getMeasures());
 
-    for (int i = 0; i < keyLengths.length; i++) {
-      keyLengths[i] = keyBlockData[i].length;
-      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+    EncodedDimensionPage[] dimensions = encodedTablePage.getDimensions();
+    for (int i = 0; i < dimensions.length; i++) {
+      IndexStorage indexStorage = dimensions[i].getIndexStorage();
+      keyLengths[i] = dimensions[i].getEncodedData().length;
+      isSortedData[i] = indexStorage.isAlreadySorted();
       if (!isSortedData[i]) {
         keyBlockSize++;
 
       }
       totalKeySize += keyLengths[i];
+      byte[] min = stats.getDimensionMinValue()[i];
+      byte[] max = stats.getDimensionMaxValue()[i];
       if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
-        allMinValue[i] = keyStorageArray[i].getMin();
-        allMaxValue[i] = keyStorageArray[i].getMax();
+        allMinValue[i] = min;
+        allMaxValue[i] = max;
       } else {
-        allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
-        allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
-      }
-      //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
-      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
-        colGrpBlock[i] = true;
+        allMinValue[i] = updateMinMaxForNoDictionary(min);
+        allMaxValue[i] = updateMinMaxForNoDictionary(max);
       }
     }
-    int[] keyBlockIdxLengths = new int[keyBlockSize];
     byte[][] dataAfterCompression = new byte[keyBlockSize][];
     byte[][] indexMap = new byte[keyBlockSize][];
     int idx = 0;
-    for (int i = 0; i < isSortedData.length; i++) {
+    for (int i = 0; i < dimensions.length; i++) {
+      IndexStorage indexStorage = dimensions[i].getIndexStorage();
       if (!isSortedData[i]) {
         dataAfterCompression[idx] =
-            numberCompressor.compress((int[])keyStorageArray[i].getRowIdPage());
-        if (null != keyStorageArray[i].getRowIdRlePage()
-            && ((int[])keyStorageArray[i].getRowIdRlePage()).length > 0) {
-          indexMap[idx] = numberCompressor.compress((int[])keyStorageArray[i].getRowIdRlePage());
+            numberCompressor.compress((int[])indexStorage.getRowIdPage());
+        if (null != indexStorage.getRowIdRlePage()
+            && ((int[])indexStorage.getRowIdRlePage()).length > 0) {
+          indexMap[idx] = numberCompressor.compress((int[])indexStorage.getRowIdRlePage());
         } else {
           indexMap[idx] = new byte[0];
         }
@@ -128,10 +129,11 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     int[] dataIndexMapLength = new int[compressDataBlockSize];
     idx = 0;
     for (int i = 0; i < dataWriterVo.getRleEncodingForDictDim().length; i++) {
+      IndexStorage indexStorage = dimensions[i].getIndexStorage();
       if (dataWriterVo.getRleEncodingForDictDim()[i]) {
         try {
           compressedDataIndex[idx] =
-              numberCompressor.compress((int[])keyStorageArray[i].getDataRlePage());
+              numberCompressor.compress((int[])indexStorage.getDataRlePage());
           dataIndexMapLength[idx] = compressedDataIndex[idx].length;
           idx++;
         } catch (Exception e) {
@@ -154,7 +156,8 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     holder.setMeasureNullValueIndex(stats.getNullBitSet());
     // end key format will be <length of dictionary key><length of no
     // dictionary key><DictionaryKey><No Dictionary key>
-    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+    byte[] updatedNoDictionaryEndKey =
+        encodedTablePage.getPageKey().updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
     ByteBuffer buffer = ByteBuffer.allocate(
         CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
             + endKey.length + updatedNoDictionaryEndKey.length);
@@ -165,7 +168,8 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     buffer.rewind();
     holder.setEndKey(buffer.array());
     holder.setMeasureLenght(msrLength);
-    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+    byte[] updatedNoDictionaryStartKey =
+        encodedTablePage.getPageKey().updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
     // start key format will be <length of dictionary key><length of no
     // dictionary key><DictionaryKey><No Dictionary key>
     buffer = ByteBuffer.allocate(
@@ -185,38 +189,28 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     holder.setCompressedIndexMap(indexMap);
     holder.setDataIndexMapLength(dataIndexMapLength);
     holder.setCompressedDataIndex(compressedDataIndex);
-    holder.setMeasureStats(stats.getMeasurePageStatistics());
     holder.setTotalDimensionArrayLength(totalKeySize);
     holder.setTotalMeasureArrayLength(totalMsrArrySize);
     //setting column min max value
     holder.setDimensionColumnMaxData(allMaxValue);
     holder.setDimensionColumnMinData(allMinValue);
     holder.setRleEncodingForDictDim(dataWriterVo.getRleEncodingForDictDim());
-    holder.setColGrpBlocks(colGrpBlock);
+    holder.setEncodedData(encodedTablePage);
     return holder;
   }
 
-  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
-    if (holder.getEntryCount() == 0) {
+  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+      throws CarbonDataWriterException {
+    if (encodedTablePage.getPageSize() == 0) {
       return;
     }
-    int indexBlockSize = 0;
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      indexBlockSize += holder.getDataIndexMapLength()[i];
-    }
-
-    long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
-            + indexBlockSize;
+    long blockletDataSize = encodedTablePage.getEncodedSize();
     updateBlockletFileChannel(blockletDataSize);
+    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
     // write data to file and get its offset
-    long offset = writeDataToFile(holder, fileChannel);
+    long offset = writeDataToFile(nodeHolder, fileChannel);
     // get the blocklet info for currently added blocklet
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(nodeHolder, offset);
     // add blocklet info to list
     blockletInfoList.add(blockletInfo);
     LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
@@ -231,6 +225,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
    */
   private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
       throws CarbonDataWriterException {
+    int numDimensions = nodeHolder.getKeyArray().length;
     // create byte buffer
     byte[][] compressedIndex = nodeHolder.getCompressedIndex();
     byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
@@ -262,16 +257,17 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
       // add measure data array to byte buffer
 
       ByteBuffer buffer1 = null;
-      for (int i = 0; i < compressedIndex.length; i++) {
-        buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
-        buffer1.putInt(compressedIndex[i].length);
-        buffer1.put(compressedIndex[i]);
-        if (compressedIndexMap[i].length > 0) {
-          buffer1.put(compressedIndexMap[i]);
+      for (int i = 0; i < numDimensions; i++) {
+        if (nodeHolder.getKeyBlockIndexLength()[i] > 0) {
+          buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
+          buffer1.putInt(compressedIndex[i].length);
+          buffer1.put(compressedIndex[i]);
+          if (compressedIndexMap[i].length > 0) {
+            buffer1.put(compressedIndexMap[i]);
+          }
+          buffer1.rewind();
+          byteBuffer.put(buffer1.array());
         }
-        buffer1.rewind();
-        byteBuffer.put(buffer1.array());
-
       }
       for (int i = 0; i < compressedDataIndex.length; i++) {
         byteBuffer.put(compressedDataIndex[i]);
@@ -356,12 +352,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
     info.setStartKey(nodeHolder.getStartKey());
     // set end key
     info.setEndKey(nodeHolder.getEndKey());
-    info.setStats(nodeHolder.getStats());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
+    info.setEncodedTablePage(nodeHolder.getEncodedData());
     return info;
   }
 
@@ -374,7 +365,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
       long currentPosition = channel.size();
       CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
       FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(blockletInfoList, localCardinality.length, localCardinality,
+          .convertFileFooter(blockletInfoList, localCardinality,
               thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
       fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFileName, currentPosition);
       writer.writeFooter(convertFileMeta, currentPosition);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index 82e83d5..65af57f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
@@ -61,17 +62,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
   /**
    * Below method will be used to write the data to carbon data file
    *
-   * @param holder
+   * @param encodedTablePage
    * @throws CarbonDataWriterException any problem in writing operation
    */
-  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
-    if (holder.getEntryCount() == 0) {
+  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+      throws CarbonDataWriterException {
+    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+    if (encodedTablePage.getPageSize() == 0) {
       return;
     }
     // size to calculate the size of the blocklet
     int size = 0;
     // get the blocklet info object
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0);
 
     List<DataChunk2> datachunks = null;
     try {
@@ -89,16 +92,16 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
       size += dataChunkByteArray[i].length;
     }
     // add row id index length
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      size += holder.getKeyBlockIndexLength()[i];
+    for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
+      size += nodeHolder.getKeyBlockIndexLength()[i];
     }
     // add rle index length
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      size += holder.getDataIndexMapLength()[i];
+    for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
+      size += nodeHolder.getDataIndexMapLength()[i];
     }
     // add dimension column data page and measure column data page size
     long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+        nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size;
     // if size of the file already reached threshold size then create a new file and get the file
     // channel object
     updateBlockletFileChannel(blockletDataSize);
@@ -117,7 +120,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
       throw new CarbonDataWriterException("Problem while getting the file channel size", e);
     }
     // write data to file and get its offset
-    writeDataToFile(holder, dataChunkByteArray, fileChannel);
+    writeDataToFile(nodeHolder, dataChunkByteArray, fileChannel);
     // add blocklet info to list
     blockletInfoList.add(blockletInfo);
     LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
@@ -132,10 +135,6 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
    * <MColumn1DataChunk><MColumn1DataPage>
    * <MColumn2DataChunk><MColumn2DataPage>
    * <MColumn2DataChunk><MColumn2DataPage>
-   *
-   * @param nodeHolder
-   * @param dataChunksBytes
-   * @param channel
    * @throws CarbonDataWriterException
    */
   private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
@@ -156,11 +155,15 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
     for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add((short) dataChunksBytes[i].length);
-      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
-          .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
-          dataWriterVo.getRleEncodingForDictDim()[i] ?
-              nodeHolder.getCompressedDataIndex()[rleIndex].length :
-              0);
+      int size1 = (!nodeHolder.getIsSortedKeyBlock()[i] ?
+          nodeHolder.getKeyBlockIndexLength()[rowIdIndex] :
+          0);
+      int size2 = (dataWriterVo.getRleEncodingForDictDim()[i] ?
+          nodeHolder.getCompressedDataIndex()[rleIndex].length :
+          0);
+      bufferSize += dataChunksBytes[i].length +
+          nodeHolder.getKeyLengths()[i] +
+          size1 + size2;
       offset += dataChunksBytes[i].length;
       offset += nodeHolder.getKeyLengths()[i];
       if (!nodeHolder.getIsSortedKeyBlock()[i]) {
@@ -180,14 +183,16 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
       buffer.put(nodeHolder.getKeyArray()[i]);
       if (!nodeHolder.getIsSortedKeyBlock()[i]) {
         buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
-        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+        byte[] b1 = nodeHolder.getCompressedIndex()[rowIdIndex];
+        buffer.put(b1);
         if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
           buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
         }
         rowIdIndex++;
       }
       if (dataWriterVo.getRleEncodingForDictDim()[i]) {
-        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+        byte[] b2 = nodeHolder.getCompressedDataIndex()[rleIndex];
+        buffer.put(b2);
         rleIndex++;
       }
     }
@@ -230,7 +235,9 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
    *
    * @return BlockletInfo - blocklet metadata
    */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+  protected BlockletInfoColumnar getBlockletInfo(EncodedTablePage encodedTablePage, long offset) {
+    NodeHolder nodeHolder = buildNodeHolder(encodedTablePage);
+
     // create the info object for leaf entry
     BlockletInfoColumnar info = new BlockletInfoColumnar();
     //add rleEncodingForDictDim array
@@ -256,12 +263,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
     info.setStartKey(nodeHolder.getStartKey());
     // set end key
     info.setEndKey(nodeHolder.getEndKey());
-    info.setStats(nodeHolder.getStats());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
+    info.setEncodedTablePage(encodedTablePage);
     return info;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 9afbb55..adb97ae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -26,9 +26,10 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
@@ -36,11 +37,8 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.FileFooter3;
-import org.apache.carbondata.processing.store.TablePageKey;
-import org.apache.carbondata.processing.store.TablePageStatistics;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 
@@ -77,181 +75,6 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     dataWriterHolder = new DataWriterHolder();
   }
 
-  /**
-   * Below method will be used to build the node holder object
-   * This node holder object will be used to persist data which will
-   * be written in carbon data file
-   */
-  @Override public NodeHolder buildDataNodeHolder(EncodedData encoded,
-      TablePageStatistics stats, TablePageKey key) throws CarbonDataWriterException {
-    // if there are no NO-Dictionary column present in the table then
-    // set the empty byte array
-    byte[] startKey = key.getStartKey();
-    byte[] endKey = key.getEndKey();
-    byte[] noDictionaryStartKey = key.getNoDictStartKey();
-    byte[] noDictionaryEndKey = key.getNoDictEndKey();
-    if (null == noDictionaryEndKey) {
-      noDictionaryEndKey = new byte[0];
-    }
-    if (null == noDictionaryStartKey) {
-      noDictionaryStartKey = new byte[0];
-    }
-    // total measure length;
-    int totalMsrArrySize = 0;
-    // current measure length;
-    int currentMsrLenght = 0;
-    int numDimensions = encoded.dimensions.length;
-    int totalKeySize = 0;
-    boolean[] isSortedData = new boolean[numDimensions];
-    int[] keyLengths = new int[numDimensions];
-    boolean[] colGrpBlock = new boolean[numDimensions];
-    int[] keyBlockIdxLengths = new int[numDimensions];
-    byte[][] dataAfterCompression = new byte[numDimensions][];
-    byte[][] indexMap = new byte[numDimensions][];
-    for (int i = 0; i < numDimensions; i++) {
-      isSortedData[i] = encoded.indexStorages[i].isAlreadySorted();
-      keyLengths[i] = encoded.dimensions[i].length;
-      totalKeySize += keyLengths[i];
-      if (!isSortedData[i]) {
-        dataAfterCompression[i] =
-            getByteArray((short[])encoded.indexStorages[i].getRowIdPage());
-        if (null != encoded.indexStorages[i].getRowIdRlePage() &&
-            ((short[])encoded.indexStorages[i].getRowIdRlePage()).length > 0) {
-          indexMap[i] = getByteArray((short[])encoded.indexStorages[i].getRowIdRlePage());
-        } else {
-          indexMap[i] = new byte[0];
-        }
-        keyBlockIdxLengths[i] = (dataAfterCompression[i].length + indexMap[i].length)
-            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      }
-      // if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
-      if (encoded.indexStorages[i] instanceof ColGroupBlockStorage) {
-        colGrpBlock[i] = true;
-      }
-    }
-    byte[][] compressedDataIndex = new byte[numDimensions][];
-    int[] dataIndexMapLength = new int[numDimensions];
-    for (int i = 0; i < dataWriterVo.getRleEncodingForDictDim().length; i++) {
-      if (dataWriterVo.getRleEncodingForDictDim()[i]) {
-        try {
-          compressedDataIndex[i] =
-              getByteArray((short[])encoded.indexStorages[i].getDataRlePage());
-          dataIndexMapLength[i] = compressedDataIndex[i].length;
-        } catch (Exception e) {
-          throw new CarbonDataWriterException(e.getMessage(), e);
-        }
-      }
-    }
-    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
-    // calculate the total size required for all the measure and get the
-    // each measure size
-    for (int i = 0; i < encoded.measures.length; i++) {
-      currentMsrLenght = encoded.measures[i].length;
-      totalMsrArrySize += currentMsrLenght;
-      msrLength[i] = currentMsrLenght;
-    }
-    NodeHolder holder = new NodeHolder();
-    holder.setDataArray(encoded.measures);
-    holder.setKeyArray(encoded.dimensions);
-    holder.setMeasureNullValueIndex(stats.getNullBitSet());
-    // end key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
-    ByteBuffer buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + endKey.length + updatedNoDictionaryEndKey.length);
-    buffer.putInt(endKey.length);
-    buffer.putInt(updatedNoDictionaryEndKey.length);
-    buffer.put(endKey);
-    buffer.put(updatedNoDictionaryEndKey);
-    buffer.rewind();
-    holder.setEndKey(buffer.array());
-    holder.setMeasureLenght(msrLength);
-    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
-    // start key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + startKey.length + updatedNoDictionaryStartKey.length);
-    buffer.putInt(startKey.length);
-    buffer.putInt(updatedNoDictionaryStartKey.length);
-    buffer.put(startKey);
-    buffer.put(updatedNoDictionaryStartKey);
-    buffer.rewind();
-    holder.setStartKey(buffer.array());
-    holder.setEntryCount(key.getPageSize());
-    holder.setKeyLengths(keyLengths);
-    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
-    holder.setIsSortedKeyBlock(isSortedData);
-    holder.setCompressedIndex(dataAfterCompression);
-    holder.setCompressedIndexMap(indexMap);
-    holder.setDataIndexMapLength(dataIndexMapLength);
-    holder.setCompressedDataIndex(compressedDataIndex);
-    holder.setMeasureStats(stats.getMeasurePageStatistics());
-    holder.setTotalDimensionArrayLength(totalKeySize);
-    holder.setTotalMeasureArrayLength(totalMsrArrySize);
-    holder.setMeasureColumnMaxData(stats.getMeasureMaxValue());
-    holder.setMeasureColumnMinData(stats.getMeasureMinValue());
-    holder.setDimensionColumnMaxData(stats.getDimensionMaxValue());
-    holder.setDimensionColumnMinData(stats.getDimensionMinValue());
-    holder.setRleEncodingForDictDim(dataWriterVo.getRleEncodingForDictDim());
-    holder.setColGrpBlocks(colGrpBlock);
-    List<byte[]> dimensionDataChunk2 = null;
-    List<byte[]> measureDataChunk2 = null;
-    try {
-      dimensionDataChunk2 = CarbonMetadataUtil
-          .getDataChunk2(holder, thriftColumnSchemaList, dataWriterVo.getSegmentProperties(), true);
-      measureDataChunk2 = CarbonMetadataUtil
-          .getDataChunk2(holder, thriftColumnSchemaList, dataWriterVo.getSegmentProperties(),
-              false);
-
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(e.getMessage());
-    }
-    holder.setHolderSize(calculateSize(holder, dimensionDataChunk2, measureDataChunk2));
-    return holder;
-  }
-
-  private int calculateSize(NodeHolder holder, List<byte[]> dimensionDataChunk2,
-      List<byte[]> measureDataChunk2) {
-    int size = 0;
-    // add row id index length
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      if (!holder.getIsSortedKeyBlock()[i]) {
-        size += holder.getKeyBlockIndexLength()[i];
-      }
-    }
-    // add rle index length
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      if (holder.getRleEncodingForDictDim()[i]) {
-        size += holder.getDataIndexMapLength()[i];
-      }
-    }
-    for (int i = 0; i < dimensionDataChunk2.size(); i++) {
-      size += dimensionDataChunk2.get(i).length;
-    }
-    for (int i = 0; i < measureDataChunk2.size(); i++) {
-      size += measureDataChunk2.get(i).length;
-    }
-    size += holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength();
-    return size;
-  }
-
-  /**
-   * Below method will be used to convert short array to byte array
-   *
-   * @param data in short data
-   * @return byte array
-   */
-  private byte[] getByteArray(short[] data) {
-    ByteBuffer buffer = ByteBuffer.allocate(data.length * 2);
-    for (int i = 0; i < data.length; i++) {
-      buffer.putShort(data[i]);
-    }
-    buffer.flip();
-    return buffer.array();
-  }
-
   @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
       throws CarbonDataWriterException {
     try {
@@ -277,65 +100,65 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
   }
 
   /**
-   * Below method will be used to write blocklet data to file
+   * Below method will be used to write one table page data
    */
-  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
-    // check the number of pages present in data holder, if pages is exceeding threshold
-    // it will write the pages to file
+  @Override public void writeTablePage(EncodedTablePage encodedTablePage)
+      throws CarbonDataWriterException {
     // condition for writting all the pages
-    if (!holder.isWriteAll()) {
+    if (!encodedTablePage.isLastPage()) {
       boolean isAdded = false;
-      // check if size more than blocklet size then write the page
-      if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) {
+      // check if size more than blocklet size then write the page to file
+      if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= blockletSize) {
         // if one page size is more than blocklet size
-        if (dataWriterHolder.getNodeHolder().size() == 0) {
+        if (dataWriterHolder.getEncodedTablePages().size() == 0) {
           isAdded = true;
-          dataWriterHolder.addNodeHolder(holder);
+          dataWriterHolder.addPage(encodedTablePage);
         }
 
         LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
             + " :Rows Added: " + dataWriterHolder.getTotalRows());
         // write the data
-        writeDataToFile(fileChannel);
+        writeBlockletToFile();
       }
       if (!isAdded) {
-        dataWriterHolder.addNodeHolder(holder);
+        dataWriterHolder.addPage(encodedTablePage);
       }
     } else {
       //for last blocklet check if the last page will exceed the blocklet size then write
       // existing pages and then last page
-      if (holder.getEntryCount() > 0) {
-        dataWriterHolder.addNodeHolder(holder);
+      if (encodedTablePage.getPageSize() > 0) {
+        dataWriterHolder.addPage(encodedTablePage);
       }
       if (dataWriterHolder.getNumberOfPagesAdded() > 0) {
         LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded()
             + " :Rows Added: " + dataWriterHolder.getTotalRows());
-        writeDataToFile(fileChannel);
+        writeBlockletToFile();
       }
     }
   }
 
-  private void writeDataToFile(FileChannel channel) {
-    // get the list of node holder list
-    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+  /**
+   * Write one blocklet data to file
+   */
+  private void writeBlockletToFile() {
+    // get the list of all encoded table page
+    List<EncodedTablePage> encodedTablePageList = dataWriterHolder.getEncodedTablePages();
+    int numDimensions = encodedTablePageList.get(0).getNumDimensions();
+    int numMeasures = encodedTablePageList.get(0).getNumMeasures();
     long blockletDataSize = 0;
     // get data chunks for all the column
-    byte[][] dataChunkBytes =
-        new byte[nodeHolderList.get(0).getKeyArray().length + nodeHolderList.get(0)
-            .getDataArray().length][];
-    int measureStartIndex = nodeHolderList.get(0).getKeyArray().length;
+    byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][];
+    int measureStartIndex = numDimensions;
     // calculate the size of data chunks
     try {
-      for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
+      for (int i = 0; i < numDimensions; i++) {
         dataChunkBytes[i] = CarbonUtil.getByteArray(
-            CarbonMetadataUtil.getDataChunk3(nodeHolderList, thriftColumnSchemaList,
-                dataWriterVo.getSegmentProperties(), i, true));
+            CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i));
         blockletDataSize += dataChunkBytes[i].length;
       }
-      for (int i = 0; i < nodeHolderList.get(0).getDataArray().length; i++) {
-        dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(CarbonMetadataUtil
-            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
-                dataWriterVo.getSegmentProperties(), i, false));
+      for (int i = 0; i < numMeasures; i++) {
+        dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(
+            CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i));
         blockletDataSize += dataChunkBytes[measureStartIndex].length;
         measureStartIndex++;
       }
@@ -346,117 +169,96 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     blockletDataSize += dataWriterHolder.getSize();
     // to check if data size will exceed the block size then create a new file
     updateBlockletFileChannel(blockletDataSize);
+
     // write data to file
-    writeDataToFile(fileChannel, dataChunkBytes);
+    try {
+      if (fileChannel.size() == 0) {
+        // write the header if file is empty
+        writeHeaderToFile(fileChannel);
+      }
+      writeBlockletToFile(fileChannel, dataChunkBytes);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem when writing file", e);
+    }
     // clear the data holder
     dataWriterHolder.clear();
   }
 
   /**
-   * Below method will be used to write data in carbon data file
-   * Data Format
+   * write file header
+   */
+  private void writeHeaderToFile(FileChannel channel) throws IOException {
+    byte[] fileHeader = CarbonUtil.getByteArray(
+        CarbonMetadataUtil.getFileHeader(
+            true, thriftColumnSchemaList, dataWriterVo.getSchemaUpdatedTimeStamp()));
+    ByteBuffer buffer = ByteBuffer.wrap(fileHeader);
+    channel.write(buffer);
+  }
+
+  /**
+   * Write one blocklet data into file
+   * File format:
    * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
    * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
    * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
    * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
-   * Each page will contain column data, Inverted index and rle index
-   *
-   * @param channel
-   * @param dataChunkBytes
    */
-  private void writeDataToFile(FileChannel channel, byte[][] dataChunkBytes) {
-    long offset = 0;
-    // write the header
-    try {
-      if (fileChannel.size() == 0) {
-        // below code is to write the file header
-        byte[] fileHeader = CarbonUtil.getByteArray(CarbonMetadataUtil
-            .getFileHeader(true, thriftColumnSchemaList, dataWriterVo.getSchemaUpdatedTimeStamp()));
-        ByteBuffer buffer = ByteBuffer.wrap(fileHeader);
-        fileChannel.write(buffer);
-      }
-      offset = channel.size();
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the file channel size");
-    }
+  private void writeBlockletToFile(FileChannel channel, byte[][] dataChunkBytes)
+      throws IOException {
+    long offset = channel.size();
     // to maintain the offset of each data chunk in blocklet
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
     List<Integer> currentDataChunksLength = new ArrayList<>();
-    // get the node holder list
-    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
-    int numberOfDimension = nodeHolderList.get(0).getKeyArray().length;
-    int numberOfMeasures = nodeHolderList.get(0).getDataArray().length;
-    NodeHolder nodeHolder = null;
+    List<EncodedTablePage> encodedTablePages = dataWriterHolder.getEncodedTablePages();
+    int numberOfDimension = encodedTablePages.get(0).getNumDimensions();
+    int numberOfMeasures = encodedTablePages.get(0).getNumMeasures();
     ByteBuffer buffer = null;
-    int bufferSize = 0;
     long dimensionOffset = 0;
     long measureOffset = 0;
     int numberOfRows = 0;
-    long totalSize = 0;
     // calculate the number of rows in each blocklet
-    for (int j = 0; j < nodeHolderList.size(); j++) {
-      numberOfRows += nodeHolderList.get(j).getEntryCount();
-      totalSize += nodeHolderList.get(j).getHolderSize();
+    for (EncodedTablePage encodedTablePage : encodedTablePages) {
+      numberOfRows += encodedTablePage.getPageSize();
     }
-    try {
-      for (int i = 0; i < numberOfDimension; i++) {
-        currentDataChunksOffset.add(offset);
-        currentDataChunksLength.add(dataChunkBytes[i].length);
-        buffer = ByteBuffer.wrap(dataChunkBytes[i]);
-        fileChannel.write(buffer);
-        offset += dataChunkBytes[i].length;
-        for (int j = 0; j < nodeHolderList.size(); j++) {
-          nodeHolder = nodeHolderList.get(j);
-          bufferSize = nodeHolder.getKeyLengths()[i] + (!nodeHolder.getIsSortedKeyBlock()[i] ?
-              nodeHolder.getKeyBlockIndexLength()[i] :
-              0) + (dataWriterVo.getRleEncodingForDictDim()[i] ?
-              nodeHolder.getCompressedDataIndex()[i].length :
-              0);
-          buffer = ByteBuffer.allocate(bufferSize);
-          buffer.put(nodeHolder.getKeyArray()[i]);
-          if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-            buffer.putInt(nodeHolder.getCompressedIndex()[i].length);
-            buffer.put(nodeHolder.getCompressedIndex()[i]);
-            if (nodeHolder.getCompressedIndexMap()[i].length > 0) {
-              buffer.put(nodeHolder.getCompressedIndexMap()[i]);
-            }
-          }
-          if (nodeHolder.getRleEncodingForDictDim()[i]) {
-            buffer.put(nodeHolder.getCompressedDataIndex()[i]);
-          }
-          buffer.flip();
-          fileChannel.write(buffer);
-          offset += bufferSize;
-        }
+    for (int i = 0; i < numberOfDimension; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add(dataChunkBytes[i].length);
+      buffer = ByteBuffer.wrap(dataChunkBytes[i]);
+      channel.write(buffer);
+      offset += dataChunkBytes[i].length;
+      for (EncodedTablePage encodedTablePage : encodedTablePages) {
+        EncodedDimensionPage dimension = encodedTablePage.getDimension(i);
+        int bufferSize = dimension.getSerializedSize();
+        buffer = dimension.serialize();
+        channel.write(buffer);
+        offset += bufferSize;
       }
-      dimensionOffset = offset;
-      int dataChunkStartIndex = nodeHolderList.get(0).getKeyArray().length;
-      for (int i = 0; i < numberOfMeasures; i++) {
-        nodeHolderList = dataWriterHolder.getNodeHolder();
-        currentDataChunksOffset.add(offset);
-        currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
-        buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]);
-        fileChannel.write(buffer);
-        offset += dataChunkBytes[dataChunkStartIndex].length;
-        dataChunkStartIndex++;
-        for (int j = 0; j < nodeHolderList.size(); j++) {
-          nodeHolder = nodeHolderList.get(j);
-          bufferSize = nodeHolder.getDataArray()[i].length;
-          buffer = ByteBuffer.wrap(nodeHolder.getDataArray()[i]);
-          fileChannel.write(buffer);
-          offset += bufferSize;
-        }
+    }
+    dimensionOffset = offset;
+    int dataChunkStartIndex = encodedTablePages.get(0).getNumDimensions();
+    for (int i = 0; i < numberOfMeasures; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
+      buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]);
+      channel.write(buffer);
+      offset += dataChunkBytes[dataChunkStartIndex].length;
+      dataChunkStartIndex++;
+      for (EncodedTablePage encodedTablePage : encodedTablePages) {
+        EncodedMeasurePage measure = encodedTablePage.getMeasure(i);
+        int bufferSize = measure.getSerializedSize();
+        buffer = measure.serialize();
+        channel.write(buffer);
+        offset += bufferSize;
       }
-      measureOffset = offset;
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the data", e);
     }
-    blockletIndex.add(CarbonMetadataUtil
-        .getBlockletIndex(nodeHolderList, dataWriterVo.getSegmentProperties().getMeasures()));
+    measureOffset = offset;
+    blockletIndex.add(
+        CarbonMetadataUtil.getBlockletIndex(
+            encodedTablePages, dataWriterVo.getSegmentProperties().getMeasures()));
     BlockletInfo3 blockletInfo3 =
         new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
-            dimensionOffset, measureOffset, dataWriterHolder.getNodeHolder().size());
+            dimensionOffset, measureOffset, dataWriterHolder.getEncodedTablePages().size());
     blockletMetadata.add(blockletInfo3);
   }
 
@@ -538,7 +340,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     closeExecutorService();
   }
 
-  @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
+  @Override public void writeFooterToFile() throws CarbonDataWriterException {
     if (this.blockletMetadata.size() > 0) {
       writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
index a98f388..246fa86 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -19,24 +19,24 @@ package org.apache.carbondata.processing.store.writer.v3;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.carbondata.core.util.NodeHolder;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 
 public class DataWriterHolder {
-  private List<NodeHolder> nodeHolder;
+  private List<EncodedTablePage> encodedTablePage;
   private long currentSize;
 
   public DataWriterHolder() {
-    this.nodeHolder = new ArrayList<NodeHolder>();
+    this.encodedTablePage = new ArrayList<EncodedTablePage>();
   }
 
   public void clear() {
-    nodeHolder.clear();
+    encodedTablePage.clear();
     currentSize = 0;
   }
 
-  public void addNodeHolder(NodeHolder holder) {
-    this.nodeHolder.add(holder);
-    currentSize += holder.getHolderSize();
+  public void addPage(EncodedTablePage encodedTablePage) {
+    this.encodedTablePage.add(encodedTablePage);
+    currentSize += encodedTablePage.getEncodedSize();
   }
 
   public long getSize() {
@@ -45,18 +45,18 @@ public class DataWriterHolder {
   }
 
   public int getNumberOfPagesAdded() {
-    return nodeHolder.size();
+    return encodedTablePage.size();
   }
 
   public int getTotalRows() {
     int rows = 0;
-    for (NodeHolder nh : nodeHolder) {
-      rows += nh.getEntryCount();
+    for (EncodedTablePage nh : encodedTablePage) {
+      rows += nh.getPageSize();
     }
     return rows;
   }
 
-  public List<NodeHolder> getNodeHolder() {
-    return nodeHolder;
+  public List<EncodedTablePage> getEncodedTablePages() {
+    return encodedTablePage;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java
deleted file mode 100644
index c634e7c..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.util;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-
-/**
- * This is the utility class for No Dictionary changes.
- */
-public class NonDictionaryUtil {
-
-  /**
-   * This method will form one single byte [] for all the high card dims.
-   * For example if you need to pack 2 columns c1 and c2 , it stores in following way
-   *  <total_len(short)><offsetLen(short)><offsetLen+c1_len(short)><c1(byte[])><c2(byte[])>
-   * @param byteBufferArr
-   * @return
-   */
-  public static byte[] packByteBufferIntoSingleByteArray(byte[][] byteBufferArr) {
-    // for empty array means there is no data to remove dictionary.
-    if (null == byteBufferArr || byteBufferArr.length == 0) {
-      return null;
-    }
-    int noOfCol = byteBufferArr.length;
-    short toDetermineLengthOfByteArr = 2;
-    short offsetLen = (short) (noOfCol * 2 + toDetermineLengthOfByteArr);
-    int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen;
-
-    ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
-
-    // write the length of the byte [] as first short
-    buffer.putShort((short) (totalBytes - toDetermineLengthOfByteArr));
-    // writing the offset of the first element.
-    buffer.putShort(offsetLen);
-
-    // prepare index for byte []
-    for (int index = 0; index < byteBufferArr.length - 1; index++) {
-      int noOfBytes = byteBufferArr[index].length;
-
-      buffer.putShort((short) (offsetLen + noOfBytes));
-      offsetLen += noOfBytes;
-    }
-
-    // put actual data.
-    for (int index = 0; index < byteBufferArr.length; index++) {
-      buffer.put(byteBufferArr[index]);
-    }
-    buffer.rewind();
-    return buffer.array();
-
-  }
-
-  /**
-   * To calculate the total bytes in byte Buffer[].
-   *
-   * @param byteBufferArr
-   * @return
-   */
-  private static int calculateTotalBytes(byte[][] byteBufferArr) {
-    int total = 0;
-    for (int index = 0; index < byteBufferArr.length; index++) {
-      total += byteBufferArr[index].length;
-    }
-    return total;
-  }
-
-  /**
-   * Method to get the required Dimension from obj []
-   *
-   * @param index
-   * @param row
-   * @return
-   */
-  public static Integer getDimension(int index, Object[] row) {
-
-    Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
-
-    return dimensions[index];
-
-  }
-
-  /**
-   * Method to get the required measure from obj []
-   *
-   * @param index
-   * @param row
-   * @return
-   */
-  public static Object getMeasure(int index, Object[] row) {
-    Object[] measures = (Object[]) row[WriteStepRowUtil.MEASURE];
-    return measures[index];
-  }
-
-  public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
-
-    return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-  }
-
-  public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
-      Object[] measureArray) {
-
-    out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
-    out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
-    out[WriteStepRowUtil.MEASURE] = measureArray;
-
-  }
-
-  /**
-   * This method will extract the single dimension from the complete high card dims byte[].+     *
-   * The format of the byte [] will be,  Totallength,CompleteStartOffsets,Dat
-   *
-   * @param highCardArr
-   * @param index
-   * @param highCardinalityCount
-   * @param outBuffer
-   */
-  public static void extractSingleHighCardDims(byte[] highCardArr, int index,
-      int highCardinalityCount, ByteBuffer outBuffer) {
-    ByteBuffer buff = null;
-    short secIndex = 0;
-    short firstIndex = 0;
-    int length;
-    // if the requested index is a last one then we need to calculate length
-    // based on byte[] length.
-    if (index == highCardinalityCount - 1) {
-      // need to read 2 bytes(1 short) to determine starting offset and
-      // length can be calculated by array length.
-      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
-    } else {
-      // need to read 4 bytes(2 short) to determine starting offset and
-      // length.
-      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
-    }
-
-    firstIndex = buff.getShort();
-    // if it is a last dimension in high card then this will be last
-    // offset.so calculate length from total length
-    if (index == highCardinalityCount - 1) {
-      secIndex = (short) highCardArr.length;
-    } else {
-      secIndex = buff.getShort();
-    }
-
-    length = secIndex - firstIndex;
-
-    outBuffer.position(firstIndex);
-    outBuffer.limit(outBuffer.position() + length);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 91cc195..cfba78b 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -165,7 +165,7 @@ public class StoreCreator {
       loadModel.setFactTimeStamp(System.currentTimeMillis());
       loadModel.setMaxColumns("10");
 
-      executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
+      loadData(loadModel, absoluteTableIdentifier.getStorePath());
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -355,7 +355,7 @@ public class StoreCreator {
    * @param storeLocation
    * @throws Exception
    */
-  public static void executeGraph(CarbonLoadModel loadModel, String storeLocation)
+  public static void loadData(CarbonLoadModel loadModel, String storeLocation)
       throws Exception {
     new File(storeLocation).mkdirs();
     String outPutLoc = storeLocation + "/etl";


Mime
View raw message