carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code
Date Sun, 26 Feb 2017 14:54:29 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 922683eb8 -> 740358c13


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index eafcd9f..289c156 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -59,10 +59,12 @@ import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.mdkeygen.file.FileData;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
@@ -137,8 +139,20 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * size reserved in one file for writing block meta data. It will be in percentage
    */
   private int spaceReservedForBlockMetaSize;
-  private FileOutputStream fileOutputStream;
-  private List<BlockIndexInfo> blockIndexInfoList;
+
+  protected FileOutputStream fileOutputStream;
+
+  protected List<BlockIndexInfo> blockIndexInfoList;
+
+  /**
+   * list of metadata for V3 format
+   */
+  protected List<BlockletInfo3> blockletMetadata;
+
+  /**
+   * list of blocklet index
+   */
+  protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
 
   public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
     this.dataWriterVo = dataWriterVo;
@@ -187,6 +201,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
     this.dataChunksOffsets = new ArrayList<>();
     this.dataChunksLength = new ArrayList<>();
+    blockletMetadata = new ArrayList<BlockletInfo3>();
+    blockletIndex = new ArrayList<>();
   }
 
   /**
@@ -242,12 +258,14 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       LOGGER.info("Writing data to file as max file size reached for file: " + fileName
           + " .Data block size: " + currentFileSize);
       // write meta data to end of the existing file
-      writeBlockletInfoToFile(blockletInfoList, fileChannel, fileName);
+      writeBlockletInfoToFile(fileChannel, fileName);
       this.currentFileSize = 0;
       blockletInfoList =
           new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
       this.dataChunksOffsets = new ArrayList<>();
       this.dataChunksLength = new ArrayList<>();
+      this.blockletMetadata = new ArrayList<>();
+      this.blockletIndex = new ArrayList<>();
       CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
       // rename carbon data file from in progress status to actual
       renameCarbonDataFile();
@@ -316,7 +334,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList,
+  protected abstract void writeBlockletInfoToFile(
       FileChannel channel, String filePath) throws CarbonDataWriterException;
 
   /**
@@ -327,19 +345,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @param filePath        file path
    * @param currentPosition current offset
    */
-  protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+  protected void fillBlockIndexInfoDetails(long numberOfRows,
       String filePath, long currentPosition) {
 
     // as min-max will change for each blocklet and second blocklet min-max can be lesser than
     // the first blocklet so we need to calculate the complete block level min-max by taking
     // the min value of each column and max value of each column
-    byte[][] currentMinValue = infoList.get(0).getColumnMinData().clone();
-    byte[][] currentMaxValue = infoList.get(0).getColumnMaxData().clone();
+    byte[][] currentMinValue = blockletInfoList.get(0).getColumnMinData().clone();
+    byte[][] currentMaxValue = blockletInfoList.get(0).getColumnMaxData().clone();
     byte[][] minValue = null;
     byte[][] maxValue = null;
-    for (int i = 1; i < infoList.size(); i++) {
-      minValue = infoList.get(i).getColumnMinData();
-      maxValue = infoList.get(i).getColumnMaxData();
+    for (int i = 1; i < blockletInfoList.size(); i++) {
+      minValue = blockletInfoList.get(i).getColumnMinData();
+      maxValue = blockletInfoList.get(i).getColumnMaxData();
       for (int j = 0; j < maxValue.length; j++) {
         if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
           currentMinValue[j] = minValue[j].clone();
@@ -352,8 +370,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     // start and end key we can take based on first blocklet
     // start key will be the block start key as
     // it is the least key and end blocklet end key will be the block end key as it is the max key
-    BlockletBTreeIndex btree = new BlockletBTreeIndex(infoList.get(0).getStartKey(),
-        infoList.get(infoList.size() - 1).getEndKey());
+    BlockletBTreeIndex btree = new BlockletBTreeIndex(blockletInfoList.get(0).getStartKey(),
+        blockletInfoList.get(blockletInfoList.size() - 1).getEndKey());
     BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
     minmax.setMinValues(currentMinValue);
     minmax.setMaxValues(currentMaxValue);
@@ -414,7 +432,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @throws IOException               throws io exception if any problem while writing
    * @throws CarbonDataWriterException data writing
    */
-  private void writeIndexFile() throws IOException, CarbonDataWriterException {
+  protected void writeIndexFile() throws IOException, CarbonDataWriterException {
     // get the header
     IndexHeader indexHeader = CarbonMetadataUtil
         .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber());
@@ -444,7 +462,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    *
    * @throws CarbonDataWriterException
    */
-  private void closeExecutorService() throws CarbonDataWriterException {
+  protected void closeExecutorService() throws CarbonDataWriterException {
     executorService.shutdown();
     try {
       executorService.awaitTermination(2, TimeUnit.HOURS);
@@ -467,7 +485,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    *
    * @throws CarbonDataWriterException
    */
-  private void renameCarbonDataFile() throws CarbonDataWriterException {
+  protected void renameCarbonDataFile() throws CarbonDataWriterException {
     File origFile = new File(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
     File curFile = new File(this.fileName);
     if (!curFile.renameTo(origFile)) {
@@ -481,7 +499,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @param localFileName local file name with full path
    * @throws CarbonDataWriterException
    */
-  private void copyCarbonDataFileToCarbonStorePath(String localFileName)
+  protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
       throws CarbonDataWriterException {
     long copyStartTime = System.currentTimeMillis();
     LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath());
@@ -537,7 +555,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
     if (this.blockletInfoList.size() > 0) {
-      writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+      writeBlockletInfoToFile(fileChannel, fileName);
     }
   }
 
@@ -551,7 +569,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
 
-  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray,
+  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage[] keyStorageArray,
       int entryCount) {
     byte[][] keyBlockData = new byte[keyStorageArray.length][];
     int destPos = 0;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index ddcdfcd..94d2727 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.store.writer;
 
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public interface CarbonFactDataWriter<T> {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
deleted file mode 100644
index 3b90869..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-
-public class NodeHolder {
-  /**
-   * keyArray
-   */
-  private byte[][] keyArray;
-
-  /**
-   * dataArray
-   */
-  private byte[][] dataArray;
-
-  /**
-   * measureLenght
-   */
-  private int[] measureLenght;
-
-  /**
-   * startKey
-   */
-  private byte[] startKey;
-
-  /**
-   * endKey
-   */
-  private byte[] endKey;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-  /**
-   * keyLenghts
-   */
-  private int[] keyLengths;
-
-  /**
-   * dataAfterCompression
-   */
-  private short[][] dataAfterCompression;
-
-  /**
-   * indexMap
-   */
-  private short[][] indexMap;
-
-  /**
-   * keyIndexBlockLenght
-   */
-  private int[] keyBlockIndexLength;
-
-  /**
-   * isSortedKeyBlock
-   */
-  private boolean[] isSortedKeyBlock;
-
-  private byte[][] compressedIndex;
-
-  private byte[][] compressedIndexMap;
-
-  /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapLength;
-
-  /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapOffsets;
-
-  /**
-   * compressedDataIndex
-   */
-  private byte[][] compressedDataIndex;
-
-  /**
-   * column max data
-   */
-  private byte[][] columnMaxData;
-
-  /**
-   * column min data
-   */
-  private byte[][] columnMinData;
-
-  /**
-   * compression model for numbers data block.
-   */
-  private WriterCompressModel compressionModel;
-
-  /**
-   * array of aggBlocks flag to identify the aggBlocks
-   */
-  private boolean[] aggBlocks;
-
-  /**
-   * all columns max value
-   */
-  private byte[][] allMaxValue;
-
-  /**
-   * all column max value
-   */
-  private byte[][] allMinValue;
-
-  /**
-   * true if given index is colgroup block
-   */
-  private boolean[] colGrpBlock;
-
-  /**
-   * bit set which will holds the measure
-   * indexes which are null
-   */
-  private BitSet[] measureNullValueIndex;
-
-  /**
-   * total length of dimension values
-   */
-  private int totalDimensionArrayLength;
-
-  /**
-   * total length of all measure values
-   */
-  private int totalMeasureArrayLength;
-
-  /**
-   * @return the keyArray
-   */
-  public byte[][] getKeyArray() {
-    return keyArray;
-  }
-
-  /**
-   * @param keyArray the keyArray to set
-   */
-  public void setKeyArray(byte[][] keyArray) {
-    this.keyArray = keyArray;
-  }
-
-  /**
-   * @return the dataArray
-   */
-  public byte[][] getDataArray() {
-    return dataArray;
-  }
-
-  /**
-   * @param dataArray the dataArray to set
-   */
-  public void setDataArray(byte[][] dataArray) {
-    this.dataArray = dataArray;
-  }
-
-  /**
-   * @return the measureLenght
-   */
-  public int[] getMeasureLenght() {
-    return measureLenght;
-  }
-
-  /**
-   * @param measureLenght the measureLenght to set
-   */
-  public void setMeasureLenght(int[] measureLenght) {
-    this.measureLenght = measureLenght;
-  }
-
-  /**
-   * @return the startKey
-   */
-  public byte[] getStartKey() {
-    return startKey;
-  }
-
-  /**
-   * @param startKey the startKey to set
-   */
-  public void setStartKey(byte[] startKey) {
-    this.startKey = startKey;
-  }
-
-  /**
-   * @return the endKey
-   */
-  public byte[] getEndKey() {
-    return endKey;
-  }
-
-  /**
-   * @param endKey the endKey to set
-   */
-  public void setEndKey(byte[] endKey) {
-    this.endKey = endKey;
-  }
-
-  /**
-   * @return the entryCount
-   */
-  public int getEntryCount() {
-    return entryCount;
-  }
-
-  /**
-   * @param entryCount the entryCount to set
-   */
-  public void setEntryCount(int entryCount) {
-    this.entryCount = entryCount;
-  }
-
-  /**
-   * @return the keyLenghts
-   */
-  public int[] getKeyLengths() {
-    return keyLengths;
-  }
-
-  public void setKeyLengths(int[] keyLengths) {
-    this.keyLengths = keyLengths;
-  }
-
-  /**
-   * @return the keyBlockIndexLength
-   */
-  public int[] getKeyBlockIndexLength() {
-    return keyBlockIndexLength;
-  }
-
-  /**
-   * @param keyBlockIndexLength the keyBlockIndexLength to set
-   */
-  public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
-    this.keyBlockIndexLength = keyBlockIndexLength;
-  }
-
-  /**
-   * @return the isSortedKeyBlock
-   */
-  public boolean[] getIsSortedKeyBlock() {
-    return isSortedKeyBlock;
-  }
-
-  /**
-   * @param isSortedKeyBlock the isSortedKeyBlock to set
-   */
-  public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
-    this.isSortedKeyBlock = isSortedKeyBlock;
-  }
-
-  /**
-   * @return the compressedIndexex
-   */
-  public byte[][] getCompressedIndex() {
-    return compressedIndex;
-  }
-
-  public void setCompressedIndex(byte[][] compressedIndex) {
-    this.compressedIndex = compressedIndex;
-  }
-
-  /**
-   * @return the compressedIndexMap
-   */
-  public byte[][] getCompressedIndexMap() {
-    return compressedIndexMap;
-  }
-
-  /**
-   * @param compressedIndexMap the compressedIndexMap to set
-   */
-  public void setCompressedIndexMap(byte[][] compressedIndexMap) {
-    this.compressedIndexMap = compressedIndexMap;
-  }
-
-  /**
-   * @return the compressedDataIndex
-   */
-  public byte[][] getCompressedDataIndex() {
-    return compressedDataIndex;
-  }
-
-  /**
-   * @param compressedDataIndex the compressedDataIndex to set
-   */
-  public void setCompressedDataIndex(byte[][] compressedDataIndex) {
-    this.compressedDataIndex = compressedDataIndex;
-  }
-
-  /**
-   * @return the dataIndexMapLength
-   */
-  public int[] getDataIndexMapLength() {
-    return dataIndexMapLength;
-  }
-
-  /**
-   * @param dataIndexMapLength the dataIndexMapLength to set
-   */
-  public void setDataIndexMapLength(int[] dataIndexMapLength) {
-    this.dataIndexMapLength = dataIndexMapLength;
-  }
-
-  public byte[][] getColumnMaxData() {
-    return this.columnMaxData;
-  }
-
-  public void setColumnMaxData(byte[][] columnMaxData) {
-    this.columnMaxData = columnMaxData;
-  }
-
-  public byte[][] getColumnMinData() {
-    return this.columnMinData;
-  }
-
-  public void setColumnMinData(byte[][] columnMinData) {
-    this.columnMinData = columnMinData;
-  }
-
-  public WriterCompressModel getCompressionModel() {
-    return compressionModel;
-  }
-
-  public void setCompressionModel(WriterCompressModel compressionModel) {
-    this.compressionModel = compressionModel;
-  }
-
-  /**
-   * returns array of aggBlocks flag to identify the aag blocks
-   *
-   * @return
-   */
-  public boolean[] getAggBlocks() {
-    return aggBlocks;
-  }
-
-  /**
-   * set array of aggBlocks flag to identify the aggBlocks
-   *
-   * @param aggBlocks
-   */
-  public void setAggBlocks(boolean[] aggBlocks) {
-    this.aggBlocks = aggBlocks;
-  }
-
-  /**
-   * @return
-   */
-  public boolean[] getColGrpBlocks() {
-    return this.colGrpBlock;
-  }
-
-  /**
-   * @param colGrpBlock true if block is column group
-   */
-  public void setColGrpBlocks(boolean[] colGrpBlock) {
-    this.colGrpBlock = colGrpBlock;
-  }
-
-  /**
-   * @return the measureNullValueIndex
-   */
-  public BitSet[] getMeasureNullValueIndex() {
-    return measureNullValueIndex;
-  }
-
-  /**
-   * @param measureNullValueIndex the measureNullValueIndex to set
-   */
-  public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
-    this.measureNullValueIndex = measureNullValueIndex;
-  }
-
-  public int getTotalDimensionArrayLength() {
-    return totalDimensionArrayLength;
-  }
-
-  public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
-    this.totalDimensionArrayLength = totalDimensionArrayLength;
-  }
-
-  public int getTotalMeasureArrayLength() {
-    return totalMeasureArrayLength;
-  }
-
-  public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
-    this.totalMeasureArrayLength = totalMeasureArrayLength;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index b7f8b4f..3218a51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.store.writer.v1;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -29,12 +28,12 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
@@ -363,15 +362,15 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+  protected void writeBlockletInfoToFile(FileChannel channel,
       String filePath) throws CarbonDataWriterException {
     try {
       long currentPosition = channel.size();
       CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
       FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(infoList, localCardinality.length, localCardinality,
+          .convertFileFooter(blockletInfoList, localCardinality.length, localCardinality,
               thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
       writer.writeFooter(convertFileMeta, currentPosition);
     } catch (IOException e) {
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index 185d98d..a9c4ce0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -30,11 +30,11 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 
@@ -265,7 +265,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+  protected void writeBlockletInfoToFile(FileChannel channel,
       String filePath) throws CarbonDataWriterException {
     try {
       // get the current file position
@@ -273,10 +273,10 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
       CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
       // get thrift file footer instance
       FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+          .convertFilterFooter2(blockletInfoList, localCardinality, thriftColumnSchemaList,
               dataChunksOffsets, dataChunksLength);
       // fill the carbon index details
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
       // write the footer
       writer.writeFooter(convertFileMeta, currentPosition);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
new file mode 100644
index 0000000..fa7bf27
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.BlockletInfo3;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * Below class will be used to write the data in V3 format
+ */
+public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> {
+
+  /**
+   * number of pages in one column
+   */
+  private int numberOfChunksInBlocklet;
+
+  /**
+   * persist the page data to be written in the file
+   */
+  private DataWriterHolder dataWriterHolder;
+
+  public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+    this.numberOfChunksInBlocklet = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+              CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE));
+    dataWriterHolder = new DataWriterHolder();
+  }
+
+  /**
+   * Below method will be used to build the node holder object
+   * This node holder object will be used to persist data which will
+   * be written in carbon data file
+   */
+  @Override public NodeHolder buildDataNodeHolder(IndexStorage<short[]>[] keyStorageArray,
+      byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey,
+      WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
+      throws CarbonDataWriterException {
+    // if there are no NO-Dictionary column present in the table then
+    // set the empty byte array
+    if (null == noDictionaryEndKey) {
+      noDictionaryEndKey = new byte[0];
+    }
+    if (null == noDictionaryStartKey) {
+      noDictionaryStartKey = new byte[0];
+    }
+    // total measure length;
+    int totalMsrArrySize = 0;
+    // current measure length;
+    int currentMsrLenght = 0;
+    int totalKeySize = 0;
+    int keyBlockSize = 0;
+
+    boolean[] isSortedData = new boolean[keyStorageArray.length];
+    int[] keyLengths = new int[keyStorageArray.length];
+
+    // below will calculate min and max value for each column
+    // for below 2d array, first index will be for column and second will be min
+    // max
+    // value for same column
+    // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+    byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
+    byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];
+
+    byte[][] measureMinValue = new byte[dataArray.length][];
+    byte[][] measureMaxValue = new byte[dataArray.length][];
+
+    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
+    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+    for (int i = 0; i < keyLengths.length; i++) {
+      keyLengths[i] = keyBlockData[i].length;
+      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+      keyBlockSize++;
+      totalKeySize += keyLengths[i];
+      if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
+        dimensionMinValue[i] = keyStorageArray[i].getMin();
+        dimensionMaxValue[i] = keyStorageArray[i].getMax();
+      } else {
+        dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+        dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+      }
+      // if keyStorageArray is instance of ColGroupBlockStorage than it's
+      // colGroup chunk
+      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+        colGrpBlock[i] = true;
+      }
+    }
+    for (int i = 0; i < dataArray.length; i++) {
+      measureMaxValue[i] = CarbonMetadataUtil
+          .getByteValueForMeasure(compressionModel.getMaxValue()[i],
+              dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+      measureMinValue[i] = CarbonMetadataUtil
+          .getByteValueForMeasure(compressionModel.getMinValue()[i],
+              dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+    }
+    int[] keyBlockIdxLengths = new int[keyBlockSize];
+    byte[][] dataAfterCompression = new byte[keyBlockSize][];
+    byte[][] indexMap = new byte[keyBlockSize][];
+    for (int i = 0; i < isSortedData.length; i++) {
+      if (!isSortedData[i]) {
+        dataAfterCompression[i] = getByteArray(keyStorageArray[i].getDataAfterComp());
+        if (null != keyStorageArray[i].getIndexMap()
+            && keyStorageArray[i].getIndexMap().length > 0) {
+          indexMap[i] = getByteArray(keyStorageArray[i].getIndexMap());
+        } else {
+          indexMap[i] = new byte[0];
+        }
+        keyBlockIdxLengths[i] = (dataAfterCompression[i].length + indexMap[i].length)
+            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      }
+    }
+    byte[][] compressedDataIndex = new byte[keyBlockSize][];
+    int[] dataIndexMapLength = new int[keyBlockSize];
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        try {
+          compressedDataIndex[i] = getByteArray(keyStorageArray[i].getDataIndexMap());
+          dataIndexMapLength[i] = compressedDataIndex[i].length;
+        } catch (Exception e) {
+          throw new CarbonDataWriterException(e.getMessage());
+        }
+      }
+    }
+    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+    // calculate the total size required for all the measure and get the
+    // each measure size
+    for (int i = 0; i < dataArray.length; i++) {
+      currentMsrLenght = dataArray[i].length;
+      totalMsrArrySize += currentMsrLenght;
+      msrLength[i] = currentMsrLenght;
+    }
+    NodeHolder holder = new NodeHolder();
+    holder.setDataArray(dataArray);
+    holder.setKeyArray(keyBlockData);
+    // end key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + endKey.length + updatedNoDictionaryEndKey.length);
+    buffer.putInt(endKey.length);
+    buffer.putInt(updatedNoDictionaryEndKey.length);
+    buffer.put(endKey);
+    buffer.put(updatedNoDictionaryEndKey);
+    buffer.rewind();
+    holder.setEndKey(buffer.array());
+    holder.setMeasureLenght(msrLength);
+    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+    // start key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + startKey.length + updatedNoDictionaryStartKey.length);
+    buffer.putInt(startKey.length);
+    buffer.putInt(updatedNoDictionaryStartKey.length);
+    buffer.put(startKey);
+    buffer.put(updatedNoDictionaryStartKey);
+    buffer.rewind();
+    holder.setStartKey(buffer.array());
+    holder.setEntryCount(entryCount);
+    holder.setKeyLengths(keyLengths);
+    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+    holder.setIsSortedKeyBlock(isSortedData);
+    holder.setCompressedIndex(dataAfterCompression);
+    holder.setCompressedIndexMap(indexMap);
+    holder.setDataIndexMapLength(dataIndexMapLength);
+    holder.setCompressedDataIndex(compressedDataIndex);
+    holder.setCompressionModel(compressionModel);
+    holder.setTotalDimensionArrayLength(totalKeySize);
+    holder.setTotalMeasureArrayLength(totalMsrArrySize);
+    holder.setMeasureColumnMaxData(measureMaxValue);
+    holder.setMeasureColumnMinData(measureMinValue);
+    // setting column min max value
+    holder.setColumnMaxData(dimensionMaxValue);
+    holder.setColumnMinData(dimensionMinValue);
+    holder.setAggBlocks(dataWriterVo.getAggBlocks());
+    holder.setColGrpBlocks(colGrpBlock);
+    return holder;
+  }
+
+  /**
+   * Below method will be used to convert short array to byte array
+   *
+   * @param data in short data
+   * @return byte array
+   */
+  private byte[] getByteArray(short[] data) {
+    ByteBuffer buffer = ByteBuffer.allocate(data.length * 2);
+    for (short i = 0; i < data.length; i++) {
+      buffer.putShort(data[i]);
+    }
+    buffer.flip();
+    return buffer.array();
+  }
+
+  @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
+      throws CarbonDataWriterException {
+    try {
+      // get the current file position
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      // get thrift file footer instance
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFileFooter3(blockletMetadata, blockletIndex, localCardinality,
+              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+      // fill the carbon index details
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
+      // write the footer
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+    }
+  }
+
+  /**
+   * Below method will be used to write blocklet data to file
+   */
+  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+    // check the number of pages present in data holder, if pages is exceeding threshold
+    // it will write the pages to file
+    if (dataWriterHolder.getNumberOfPagesAdded() == numberOfChunksInBlocklet) {
+      writeDataToFile(fileChannel);
+    }
+    dataWriterHolder.addNodeHolder(holder);
+  }
+
+  private void writeDataToFile(FileChannel channel) {
+    // get the list of node holder list
+    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+    long blockletDataSize = 0;
+    // get data chunks for all the column
+    byte[][] dataChunkBytes =
+        new byte[nodeHolderList.get(0).getKeyArray().length + nodeHolderList.get(0)
+            .getDataArray().length][];
+    int measureStartIndex = nodeHolderList.get(0).getKeyArray().length;
+    // calculate the size of data chunks
+    try {
+      for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
+        dataChunkBytes[i] = CarbonUtil.getByteArray(CarbonMetadataUtil
+            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+                dataWriterVo.getSegmentProperties(), i, true));
+        blockletDataSize += dataChunkBytes[i].length;
+      }
+      for (int i = 0; i < nodeHolderList.get(0).getDataArray().length; i++) {
+        dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(CarbonMetadataUtil
+            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+                dataWriterVo.getSegmentProperties(), i, false));
+        blockletDataSize += dataChunkBytes[measureStartIndex].length;
+        measureStartIndex++;
+      }
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+    }
+    // calculate the total size of data to be written
+    blockletDataSize += dataWriterHolder.getSize();
+    // to check if data size will exceed the block size then create a new file
+    updateBlockletFileChannel(blockletDataSize);
+    // write data to file
+    writeDataToFile(fileChannel, dataChunkBytes);
+    // clear the data holder
+    dataWriterHolder.clear();
+  }
+
+  /**
+   * Below method will be used to write data in carbon data file
+   * Data Format
+   * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+   * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+   * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+   * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+   * Each page will contain column data, Inverted index and rle index
+   *
+   * @param channel
+   * @param dataChunkBytes
+   */
+  private void writeDataToFile(FileChannel channel, byte[][] dataChunkBytes) {
+    long offset = 0;
+    // write the header
+    try {
+      if (fileChannel.size() == 0) {
+        ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(header.length);
+        buffer.put(header);
+        buffer.rewind();
+        fileChannel.write(buffer);
+      }
+      offset = channel.size();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file channel size");
+    }
+    // to maintain the offset of each data chunk in blocklet
+    List<Long> currentDataChunksOffset = new ArrayList<>();
+    // to maintain the length of each data chunk in blocklet
+    List<Integer> currentDataChunksLength = new ArrayList<>();
+    // get the node holder list
+    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+    int numberOfDimension = nodeHolderList.get(0).getKeyArray().length;
+    int numberOfMeasures = nodeHolderList.get(0).getDataArray().length;
+    NodeHolder nodeHolder = null;
+    ByteBuffer buffer = null;
+    int bufferSize = 0;
+    long dimensionOffset = 0;
+    long measureOffset = 0;
+    int numberOfRows = 0;
+    // calculate the number of rows in each blocklet
+    for (int j = 0; j < nodeHolderList.size(); j++) {
+      numberOfRows += nodeHolderList.get(j).getEntryCount();
+    }
+    try {
+      for (int i = 0; i < numberOfDimension; i++) {
+        currentDataChunksOffset.add(offset);
+        currentDataChunksLength.add(dataChunkBytes[i].length);
+        buffer = ByteBuffer.allocate(dataChunkBytes[i].length);
+        buffer.put(dataChunkBytes[i]);
+        buffer.flip();
+        fileChannel.write(buffer);
+        offset += dataChunkBytes[i].length;
+        for (int j = 0; j < nodeHolderList.size(); j++) {
+          nodeHolder = nodeHolderList.get(j);
+          bufferSize = nodeHolder.getKeyLengths()[i] + (!nodeHolder.getIsSortedKeyBlock()[i] ?
+              nodeHolder.getKeyBlockIndexLength()[i] :
+              0) + (dataWriterVo.getAggBlocks()[i] ?
+              nodeHolder.getCompressedDataIndex()[i].length :
+              0);
+          buffer = ByteBuffer.allocate(bufferSize);
+          buffer.put(nodeHolder.getKeyArray()[i]);
+          if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+            buffer.putInt(nodeHolder.getCompressedIndex()[i].length);
+            buffer.put(nodeHolder.getCompressedIndex()[i]);
+            if (nodeHolder.getCompressedIndexMap()[i].length > 0) {
+              buffer.put(nodeHolder.getCompressedIndexMap()[i]);
+            }
+          }
+          if (nodeHolder.getAggBlocks()[i]) {
+            buffer.put(nodeHolder.getCompressedDataIndex()[i]);
+          }
+          buffer.flip();
+          fileChannel.write(buffer);
+          offset += bufferSize;
+        }
+      }
+      dimensionOffset = offset;
+      int dataChunkStartIndex = nodeHolderList.get(0).getKeyArray().length;
+      for (int i = 0; i < numberOfMeasures; i++) {
+        nodeHolderList = dataWriterHolder.getNodeHolder();
+        currentDataChunksOffset.add(offset);
+        currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
+        buffer = ByteBuffer.allocate(dataChunkBytes[dataChunkStartIndex].length);
+        buffer.put(dataChunkBytes[dataChunkStartIndex]);
+        buffer.flip();
+        fileChannel.write(buffer);
+        offset += dataChunkBytes[dataChunkStartIndex].length;
+        dataChunkStartIndex++;
+        for (int j = 0; j < nodeHolderList.size(); j++) {
+          nodeHolder = nodeHolderList.get(j);
+          bufferSize = nodeHolder.getDataArray()[i].length;
+          buffer = ByteBuffer.allocate(bufferSize);
+          buffer.put(nodeHolder.getDataArray()[i]);
+          buffer.flip();
+          fileChannel.write(buffer);
+          offset += bufferSize;
+        }
+      }
+      measureOffset = offset;
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the data", e);
+    }
+    blockletIndex.add(CarbonMetadataUtil
+        .getBlockletIndex(nodeHolderList, dataWriterVo.getSegmentProperties().getMeasures()));
+    BlockletInfo3 blockletInfo3 =
+        new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
+            dimensionOffset, measureOffset);
+    blockletMetadata.add(blockletInfo3);
+  }
+
+  /**
+   * Below method will be used to fill the block info details
+   *
+   * @param numberOfRows    number of rows in file
+   * @param filePath        file path
+   * @param currentPosition current offset
+   */
+  protected void fillBlockIndexInfoDetails(long numberOfRows, String filePath,
+      long currentPosition) {
+    byte[][] currentMinValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+    byte[][] currentMaxValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+    for (int i = 0; i < currentMaxValue.length; i++) {
+      currentMinValue[i] = blockletIndex.get(0).min_max_index.getMin_values().get(i).array();
+      currentMaxValue[i] = blockletIndex.get(0).min_max_index.getMax_values().get(i).array();
+    }
+    byte[] minValue = null;
+    byte[] maxValue = null;
+    int measureStartIndex = currentMinValue.length - dataWriterVo.getMeasureCount();
+    for (int i = 1; i < blockletIndex.size(); i++) {
+      for (int j = 0; j < measureStartIndex; j++) {
+        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue) > 0) {
+          currentMinValue[j] = minValue.clone();
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue) < 0) {
+          currentMaxValue[j] = maxValue.clone();
+        }
+      }
+      int measureIndex = 0;
+      for (int j = measureStartIndex; j < currentMinValue.length; j++) {
+        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+
+        if (CarbonMetadataUtil.compareMeasureData(currentMinValue[j], minValue,
+            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+            > 0) {
+          currentMinValue[j] = minValue.clone();
+        }
+        if (CarbonMetadataUtil.compareMeasureData(currentMaxValue[j], maxValue,
+            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+            < 0) {
+          currentMaxValue[j] = maxValue.clone();
+        }
+      }
+    }
+    BlockletBTreeIndex btree =
+        new BlockletBTreeIndex(blockletIndex.get(0).b_tree_index.getStart_key(),
+            blockletIndex.get(blockletIndex.size() - 1).b_tree_index.getEnd_key());
+    BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
+    minmax.setMinValues(currentMinValue);
+    minmax.setMaxValues(currentMaxValue);
+    org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex blockletIndex =
+        new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(btree, minmax);
+    BlockIndexInfo blockIndexInfo =
+        new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
+            currentPosition, blockletIndex);
+    blockIndexInfoList.add(blockIndexInfo);
+  }
+
+  /**
+   * Method will be used to close the open file channel
+   *
+   * @throws CarbonDataWriterException
+   */
+  public void closeWriter() throws CarbonDataWriterException {
+    if (dataWriterHolder.getNodeHolder().size() > 0) {
+      writeDataToFile(fileChannel);
+      writeBlockletInfoToFile(fileChannel, fileName);
+      CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+      renameCarbonDataFile();
+      copyCarbonDataFileToCarbonStorePath(
+          this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+      try {
+        writeIndexFile();
+      } catch (IOException e) {
+        throw new CarbonDataWriterException("Problem while writing the index file", e);
+      }
+    }
+    closeExecutorService();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
new file mode 100644
index 0000000..0827bd0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.util.NodeHolder;
+
+public class DataWriterHolder {
+  private List<NodeHolder> nodeHolder;
+  private long currentSize;
+
+  public DataWriterHolder() {
+    this.nodeHolder = new ArrayList<NodeHolder>();
+  }
+
+  public void clear() {
+    nodeHolder.clear();
+    currentSize = 0;
+  }
+
+  public void addNodeHolder(NodeHolder holder) {
+    this.nodeHolder.add(holder);
+
+    int size = 0;
+    // add row id index length
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      if (!holder.getIsSortedKeyBlock()[i]) {
+        size += holder.getKeyBlockIndexLength()[i];
+      }
+    }
+    // add rle index length
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      if (holder.getAggBlocks()[i]) {
+        size += holder.getDataIndexMapLength()[i];
+      }
+    }
+    currentSize +=
+        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+  }
+
+  public long getSize() {
+    return currentSize;
+  }
+
+  public int getNumberOfPagesAdded() {
+    return nodeHolder.size();
+  }
+
+  public List<NodeHolder> getNodeHolder() {
+    return nodeHolder;
+  }
+}


Mime
View raw message