Repository: incubator-carbondata
Updated Branches:
refs/heads/master 9ad98f432 -> 7213ac057
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 16bd771..279bb63 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -26,6 +26,7 @@ import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
@@ -52,22 +52,21 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSche
import org.apache.carbondata.core.carbon.path.CarbonStorePath;
import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
+import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonMergerUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
-import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.mdkeygen.file.FileData;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.commons.lang3.ArrayUtils;
@@ -87,10 +86,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
private static final int HDFS_CHECKSUM_LENGTH = 512;
/**
- * measure count
- */
- protected int measureCount;
- /**
* file channel
*/
protected FileChannel fileChannel;
@@ -98,16 +93,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* this will be used for holding blocklet metadata
*/
protected List<BlockletInfoColumnar> blockletInfoList;
- /**
- * keyBlockSize
- */
- protected int[] keyBlockSize;
protected boolean[] isNoDictionary;
/**
- * mdkeySize
- */
- protected int mdkeySize;
- /**
* file name
*/
protected String fileName;
@@ -115,15 +102,14 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* Local cardinality for the segment
*/
protected int[] localCardinality;
- protected String databaseName;
/**
* thrift column schema
*/
protected List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchemaList;
- /**
- * tabel name
- */
- private String tableName;
+ protected NumberCompressor numberCompressor;
+ protected CarbonDataWriterVo dataWriterVo;
+ protected List<List<Long>> dataChunksOffsets;
+ protected List<List<Short>> dataChunksLength;
/**
* data file size;
*/
@@ -133,32 +119,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
private int fileCount;
/**
- * File manager
- */
- private IFileManagerComposite fileManager;
- /**
- * Store Location
- */
- private String storeLocation;
- /**
* executorService
*/
private ExecutorService executorService;
-
/**
* executorService
*/
private List<Future<Void>> executorServiceSubmitList;
- /**
- * data file attributes which will used for file construction
- */
- private CarbonDataFileAttributes carbonDataFileAttributes;
private CarbonTablePath carbonTablePath;
/**
- * data directory location in carbon store path
- */
- private String carbonDataDirectoryPath;
- /**
* data block size for one carbon data file
*/
private long dataBlockSize;
@@ -171,68 +140,55 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
private int spaceReservedForBlockMetaSize;
private FileOutputStream fileOutputStream;
-
- private SegmentProperties segmentProperties;
-
private List<BlockIndexInfo> blockIndexInfoList;
- public AbstractFactDataWriter(String storeLocation, int measureCount, int mdKeyLength,
- String databaseName, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize,
- CarbonDataFileAttributes carbonDataFileAttributes, List<ColumnSchema> columnSchema,
- String carbonDataDirectoryPath, int[] colCardinality, SegmentProperties segmentProperties,
- int blocksize) {
-
- // measure count
- this.measureCount = measureCount;
- // table name
- this.tableName = tableName;
- this.databaseName = databaseName;
-
- this.databaseName = databaseName;
-
- this.storeLocation = storeLocation;
- this.segmentProperties = segmentProperties;
+ public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
+ this.dataWriterVo = dataWriterVo;
this.blockletInfoList =
new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
blockIndexInfoList = new ArrayList<>();
// get max file size;
CarbonProperties propInstance = CarbonProperties.getInstance();
// if blocksize=2048, then 2048*1024*1024 will beyond the range of Int
- this.fileSizeInBytes = (long) blocksize * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
- * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
+ this.fileSizeInBytes =
+ (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
+ * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
this.spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
.getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize);
- this.fileManager = fileManager;
- this.carbonDataDirectoryPath = carbonDataDirectoryPath;
- this.keyBlockSize = keyBlockSize;
- this.mdkeySize = mdKeyLength;
+
this.executorService = Executors.newFixedThreadPool(1);
executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
// in case of compaction we will pass the cardinality.
- this.localCardinality = colCardinality;
- this.carbonDataFileAttributes = carbonDataFileAttributes;
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
- carbonTablePath =
- CarbonStorePath.getCarbonTablePath(storeLocation, carbonTable.getCarbonTableIdentifier());
+ this.localCardinality = dataWriterVo.getColCardinality();
+ CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+ dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo
+ .getTableName());
+ carbonTablePath = CarbonStorePath.getCarbonTablePath(dataWriterVo.getStoreLocation(),
+ carbonTable.getCarbonTableIdentifier());
//TODO: We should delete the levelmetadata file after reading here.
// so only data loading flow will need to read from cardinality file.
if (null == this.localCardinality) {
- this.localCardinality =
- CarbonMergerUtil.getCardinalityFromLevelMetadata(storeLocation, tableName);
+ this.localCardinality = CarbonMergerUtil
+ .getCardinalityFromLevelMetadata(dataWriterVo.getStoreLocation(),
+ dataWriterVo.getTableName());
List<Integer> cardinalityList = new ArrayList<Integer>();
- thriftColumnSchemaList =
- getColumnSchemaListAndCardinality(cardinalityList, localCardinality, columnSchema);
+ thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+ dataWriterVo.getWrapperColumnSchemaList());
localCardinality =
ArrayUtils.toPrimitive(cardinalityList.toArray(new Integer[cardinalityList.size()]));
} else { // for compaction case
List<Integer> cardinalityList = new ArrayList<Integer>();
- thriftColumnSchemaList =
- getColumnSchemaListAndCardinality(cardinalityList, localCardinality, columnSchema);
+ thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+ dataWriterVo.getWrapperColumnSchemaList());
}
+ this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+ CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
+ this.dataChunksOffsets = new ArrayList<>();
+ this.dataChunksLength = new ArrayList<>();
}
/**
@@ -258,8 +214,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
String readableFileSize = ByteUtil.convertByteToReadable(fileSize);
String readableMaxSize = ByteUtil.convertByteToReadable(maxSize);
LOGGER.info("The configured block size is " + readableBlockSize +
- ", the actual carbon file size is " + readableFileSize +
- ", choose the max value " + readableMaxSize + " as the block size on HDFS");
+ ", the actual carbon file size is " + readableFileSize +
+ ", choose the max value " + readableMaxSize + " as the block size on HDFS");
return maxSize;
}
@@ -291,6 +247,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.currentFileSize = 0;
blockletInfoList =
new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ this.dataChunksOffsets = new ArrayList<>();
+ this.dataChunksLength = new ArrayList<>();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
// rename carbon data file from in progress status to actual
renameCarbonDataFile();
@@ -312,12 +270,12 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
// increment the file sequence counter
initFileCount();
String carbonDataFileName = carbonTablePath
- .getCarbonDataFileName(fileCount, carbonDataFileAttributes.getTaskId(),
- carbonDataFileAttributes.getFactTimeStamp());
+ .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+ dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
- FileData fileData = new FileData(actualFileNameVal, this.storeLocation);
- fileManager.add(fileData);
- this.fileName = storeLocation + File.separator + carbonDataFileName
+ FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation());
+ dataWriterVo.getFileManager().add(fileData);
+ this.fileName = dataWriterVo.getStoreLocation() + File.separator + carbonDataFileName
+ CarbonCommonConstants.FILE_INPROGRESS_STATUS;
this.fileCount++;
try {
@@ -332,11 +290,10 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
private int initFileCount() {
int fileInitialCount = 0;
- File[] dataFiles = new File(storeLocation).listFiles(new FileFilter() {
-
+ File[] dataFiles = new File(dataWriterVo.getStoreLocation()).listFiles(new FileFilter() {
@Override public boolean accept(File pathVal) {
- if (!pathVal.isDirectory() && pathVal.getName().startsWith(tableName) && pathVal.getName()
- .contains(CarbonCommonConstants.FACT_FILE_EXT)) {
+ if (!pathVal.isDirectory() && pathVal.getName().startsWith(dataWriterVo.getTableName())
+ && pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) {
return true;
}
return false;
@@ -359,20 +316,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
/**
* This method will write metadata at the end of file file format in thrift format
*/
- protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
- String filePath) throws CarbonDataWriterException {
- try {
- long currentPosition = channel.size();
- CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
- FileFooter convertFileMeta = CarbonMetadataUtil
- .convertFileFooter(infoList, localCardinality.length, localCardinality,
- thriftColumnSchemaList, segmentProperties);
- fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
- writer.writeFooter(convertFileMeta, currentPosition);
- } catch (IOException e) {
- throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
- }
- }
+ protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList,
+ FileChannel channel, String filePath) throws CarbonDataWriterException;
/**
* Below method will be used to fill the vlock info details
@@ -382,7 +327,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @param filePath file path
* @param currentPosition current offset
*/
- private void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+ protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
String filePath, long currentPosition) {
// as min-max will change for each blocklet and second blocklet min-max can be lesser than
@@ -448,56 +393,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*
* @return BlockletInfo - blocklet metadata
*/
- protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
- // create the info object for leaf entry
- BlockletInfoColumnar infoObj = new BlockletInfoColumnar();
- // add total entry count
- infoObj.setNumberOfKeys(nodeHolder.getEntryCount());
-
- // add the key array length
- infoObj.setKeyLengths(nodeHolder.getKeyLengths());
- //add column min max data
- infoObj.setColumnMaxData(nodeHolder.getColumnMaxData());
- infoObj.setColumnMinData(nodeHolder.getColumnMinData());
- infoObj.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
- long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
-
- for (int i = 0; i < keyOffSets.length; i++) {
- keyOffSets[i] = offset;
- offset += nodeHolder.getKeyLengths()[i];
- }
- // add key offset
- infoObj.setKeyOffSets(keyOffSets);
-
- // add measure length
- infoObj.setMeasureLength(nodeHolder.getMeasureLenght());
-
- long[] msrOffset = new long[this.measureCount];
-
- for (int i = 0; i < this.measureCount; i++) {
- msrOffset[i] = offset;
- // now increment the offset by adding measure length to get the next
- // measure offset;
- offset += nodeHolder.getMeasureLenght()[i];
- }
- // add measure offset
- infoObj.setMeasureOffset(msrOffset);
- infoObj.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
- infoObj.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
- long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length];
- for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
- keyBlockIndexOffsets[i] = offset;
- offset += nodeHolder.getKeyBlockIndexLength()[i];
- }
- infoObj.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
- // set startkey
- infoObj.setStartKey(nodeHolder.getStartKey());
- // set end key
- infoObj.setEndKey(nodeHolder.getEndKey());
- infoObj.setCompressionModel(nodeHolder.getCompressionModel());
- // return leaf metadata
- return infoObj;
- }
+ protected abstract BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset);
/**
* Method will be used to close the open file channel
@@ -528,9 +424,9 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList);
// get the block index info thrift
List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
- String fileName = storeLocation + File.separator + carbonTablePath
- .getCarbonIndexFileName(carbonDataFileAttributes.getTaskId(),
- carbonDataFileAttributes.getFactTimeStamp());
+ String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath
+ .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
+ dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
// open file
writer.openThriftWriter(fileName);
@@ -591,11 +487,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
private void copyCarbonDataFileToCarbonStorePath(String localFileName)
throws CarbonDataWriterException {
long copyStartTime = System.currentTimeMillis();
- LOGGER.info("Copying " + localFileName + " --> " + carbonDataDirectoryPath);
+ LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath());
try {
CarbonFile localCarbonFile =
FileFactory.getCarbonFile(localFileName, FileFactory.getFileType(localFileName));
- String carbonFilePath = carbonDataDirectoryPath + localFileName
+ String carbonFilePath = dataWriterVo.getCarbonDataDirectoryPath() + localFileName
.substring(localFileName.lastIndexOf(File.separator));
copyLocalFileToCarbonStore(carbonFilePath, localFileName,
CarbonCommonConstants.BYTEBUFFER_SIZE,
@@ -654,18 +550,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws CarbonDataWriterException
* @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
*/
- protected void writeDataToFile(NodeHolder nodeHolder) throws CarbonDataWriterException {
- // write data to file and get its offset
- long offset = writeDataToFile(nodeHolder, fileChannel);
- // get the blocklet info for currently added blocklet
- BlockletInfoColumnar blockletInfo = getBlockletInfo(nodeHolder, offset);
- // add blocklet info to list
- blockletInfoList.add(blockletInfo);
- // calculate the current size of the file
- }
-
- protected abstract long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
- throws CarbonDataWriterException;
+ public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
@Override public int getLeafMetadataSize() {
return blockletInfoList.size();
@@ -675,6 +560,99 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
return this.fileName;
}
+ protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray,
+ int entryCount) {
+ byte[][] keyBlockData = new byte[keyStorageArray.length][];
+ int destPos = 0;
+ int keyBlockSizePosition = -1;
+ for (int i = 0; i < keyStorageArray.length; i++) {
+ destPos = 0;
+ //handling for high card dims
+ if (!dataWriterVo.getIsComplexType()[i] && !dataWriterVo.getIsDictionaryColumn()[i]) {
+ int totalLength = 0;
+ // calc size of the total bytes in all the colmns.
+ for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
+ byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
+ totalLength += colValue.length;
+ }
+ keyBlockData[i] = new byte[totalLength];
+
+ for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+ int length = keyStorageArray[i].getKeyBlock()[j].length;
+ System
+ .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length);
+ destPos += length;
+ }
+ } else {
+ keyBlockSizePosition++;
+ if (dataWriterVo.getAggBlocks()[i]) {
+ keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
+ for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+ System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
+ keyStorageArray[i].getKeyBlock()[j].length);
+ destPos += keyStorageArray[i].getKeyBlock()[j].length;
+ }
+ } else {
+ if (dataWriterVo.getIsComplexType()[i]) {
+ keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length * dataWriterVo
+ .getKeyBlockSize()[keyBlockSizePosition]];
+ } else {
+ keyBlockData[i] =
+ new byte[entryCount * dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]];
+ }
+ for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
+ System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
+ dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]);
+ destPos += dataWriterVo.getKeyBlockSize()[keyBlockSizePosition];
+ }
+ }
+ }
+ keyBlockData[i] = SnappyByteCompression.INSTANCE.compress(keyBlockData[i]);
+ }
+ return keyBlockData;
+ }
+
+ /**
+ * Below method will be used to update the min or max value
+ * by removing the length from it
+ *
+ * @return min max value without length
+ */
+ protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
+ ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
+ byte[] actualValue = new byte[buffer.getShort()];
+ buffer.get(actualValue);
+ return actualValue;
+ }
+
+ /**
+ * Below method will be used to update the no dictionary start and end key
+ *
+ * @param key key to be updated
+ * @return return no dictionary key
+ */
+ protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
+ if (key.length == 0) {
+ return key;
+ }
+ // add key to byte buffer remove the length part of the data
+ ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
+ // create a output buffer without length
+ ByteBuffer output = ByteBuffer.allocate(key.length - 2);
+ short numberOfByteToStorLength = 2;
+ // as length part is removed, so each no dictionary value index
+ // needs to be reshuffled by 2 bytes
+ for (int i = 0; i < dataWriterVo.getNoDictionaryCount(); i++) {
+ output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
+ }
+ // copy the data part
+ while (buffer.hasRemaining()) {
+ output.put(buffer.get());
+ }
+ output.rewind();
+ return output.array();
+ }
+
/**
* This method will copy the carbon data file from local store location to
* carbon store location
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
new file mode 100644
index 0000000..6e0287d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+
+/**
+ * Value object for writing the data
+ */
+public class CarbonDataWriterVo {
+
+ private String storeLocation;
+
+ private int measureCount;
+
+ private int mdKeyLength;
+
+ private String tableName;
+
+ private IFileManagerComposite fileManager;
+
+ private int[] keyBlockSize;
+
+ private boolean[] aggBlocks;
+
+ private boolean[] isComplexType;
+
+ private int NoDictionaryCount;
+
+ private CarbonDataFileAttributes carbonDataFileAttributes;
+
+ private String databaseName;
+
+ private List<ColumnSchema> wrapperColumnSchemaList;
+
+ private int numberOfNoDictionaryColumn;
+
+ private boolean[] isDictionaryColumn;
+
+ private String carbonDataDirectoryPath;
+
+ private int[] colCardinality;
+
+ private SegmentProperties segmentProperties;
+
+ private int tableBlocksize;
+
+ /**
+ * @return the storeLocation
+ */
+ public String getStoreLocation() {
+ return storeLocation;
+ }
+
+ /**
+ * @param storeLocation the storeLocation to set
+ */
+ public void setStoreLocation(String storeLocation) {
+ this.storeLocation = storeLocation;
+ }
+
+ /**
+ * @return the measureCount
+ */
+ public int getMeasureCount() {
+ return measureCount;
+ }
+
+ /**
+ * @param measureCount the measureCount to set
+ */
+ public void setMeasureCount(int measureCount) {
+ this.measureCount = measureCount;
+ }
+
+ /**
+ * @return the mdKeyLength
+ */
+ public int getMdKeyLength() {
+ return mdKeyLength;
+ }
+
+ /**
+ * @param mdKeyLength the mdKeyLength to set
+ */
+ public void setMdKeyLength(int mdKeyLength) {
+ this.mdKeyLength = mdKeyLength;
+ }
+
+ /**
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @param tableName the tableName to set
+ */
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * @return the fileManager
+ */
+ public IFileManagerComposite getFileManager() {
+ return fileManager;
+ }
+
+ /**
+ * @param fileManager the fileManager to set
+ */
+ public void setFileManager(IFileManagerComposite fileManager) {
+ this.fileManager = fileManager;
+ }
+
+ /**
+ * @return the keyBlockSize
+ */
+ public int[] getKeyBlockSize() {
+ return keyBlockSize;
+ }
+
+ /**
+ * @param keyBlockSize the keyBlockSize to set
+ */
+ public void setKeyBlockSize(int[] keyBlockSize) {
+ this.keyBlockSize = keyBlockSize;
+ }
+
+ /**
+ * @return the aggBlocks
+ */
+ public boolean[] getAggBlocks() {
+ return aggBlocks;
+ }
+
+ /**
+ * @param aggBlocks the aggBlocks to set
+ */
+ public void setAggBlocks(boolean[] aggBlocks) {
+ this.aggBlocks = aggBlocks;
+ }
+
+ /**
+ * @return the isComplexType
+ */
+ public boolean[] getIsComplexType() {
+ return isComplexType;
+ }
+
+ /**
+ * @param isComplexType the isComplexType to set
+ */
+ public void setIsComplexType(boolean[] isComplexType) {
+ this.isComplexType = isComplexType;
+ }
+
+ /**
+ * @return the noDictionaryCount
+ */
+ public int getNoDictionaryCount() {
+ return NoDictionaryCount;
+ }
+
+ /**
+ * @param noDictionaryCount the noDictionaryCount to set
+ */
+ public void setNoDictionaryCount(int noDictionaryCount) {
+ NoDictionaryCount = noDictionaryCount;
+ }
+
+ /**
+ * @return the carbonDataFileAttributes
+ */
+ public CarbonDataFileAttributes getCarbonDataFileAttributes() {
+ return carbonDataFileAttributes;
+ }
+
+ /**
+ * @param carbonDataFileAttributes the carbonDataFileAttributes to set
+ */
+ public void setCarbonDataFileAttributes(CarbonDataFileAttributes carbonDataFileAttributes) {
+ this.carbonDataFileAttributes = carbonDataFileAttributes;
+ }
+
+ /**
+ * @return the databaseName
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * @param databaseName the databaseName to set
+ */
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ /**
+ * @return the wrapperColumnSchemaList
+ */
+ public List<ColumnSchema> getWrapperColumnSchemaList() {
+ return wrapperColumnSchemaList;
+ }
+
+ /**
+ * @param wrapperColumnSchemaList the wrapperColumnSchemaList to set
+ */
+ public void setWrapperColumnSchemaList(List<ColumnSchema> wrapperColumnSchemaList) {
+ this.wrapperColumnSchemaList = wrapperColumnSchemaList;
+ }
+
+ /**
+ * @return the numberOfNoDictionaryColumn
+ */
+ public int getNumberOfNoDictionaryColumn() {
+ return numberOfNoDictionaryColumn;
+ }
+
+ /**
+ * @param numberOfNoDictionaryColumn the numberOfNoDictionaryColumn to set
+ */
+ public void setNumberOfNoDictionaryColumn(int numberOfNoDictionaryColumn) {
+ this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn;
+ }
+
+ /**
+ * @return the isDictionaryColumn
+ */
+ public boolean[] getIsDictionaryColumn() {
+ return isDictionaryColumn;
+ }
+
+ /**
+ * @param isDictionaryColumn the isDictionaryColumn to set
+ */
+ public void setIsDictionaryColumn(boolean[] isDictionaryColumn) {
+ this.isDictionaryColumn = isDictionaryColumn;
+ }
+
+ /**
+ * @return the carbonDataDirectoryPath
+ */
+ public String getCarbonDataDirectoryPath() {
+ return carbonDataDirectoryPath;
+ }
+
+ /**
+ * @param carbonDataDirectoryPath the carbonDataDirectoryPath to set
+ */
+ public void setCarbonDataDirectoryPath(String carbonDataDirectoryPath) {
+ this.carbonDataDirectoryPath = carbonDataDirectoryPath;
+ }
+
+ /**
+ * @return the colCardinality
+ */
+ public int[] getColCardinality() {
+ return colCardinality;
+ }
+
+ /**
+ * @param colCardinality the colCardinality to set
+ */
+ public void setColCardinality(int[] colCardinality) {
+ this.colCardinality = colCardinality;
+ }
+
+ /**
+ * @return the segmentProperties
+ */
+ public SegmentProperties getSegmentProperties() {
+ return segmentProperties;
+ }
+
+ /**
+ * @param segmentProperties the segmentProperties to set
+ */
+ public void setSegmentProperties(SegmentProperties segmentProperties) {
+ this.segmentProperties = segmentProperties;
+ }
+
+ /**
+ * @return the tableBlocksize
+ */
+ public int getTableBlocksize() {
+ return tableBlocksize;
+ }
+
+ /**
+ * @param tableBlocksize the tableBlocksize to set
+ */
+ public void setTableBlocksize(int tableBlocksize) {
+ this.tableBlocksize = tableBlocksize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
new file mode 100644
index 0000000..d399280
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * Below method will be used to write the data in version 2 format
+ */
+public class CarbonFactDataWriterImpl2 extends CarbonFactDataWriterImplForIntIndexAndAggBlock {
+
+ /**
+ * logger
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName());
+
+ /**
+ * Constructor create instance of this class
+ *
+ * @param dataWriterVo
+ */
+ public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) {
+ super(dataWriterVo);
+ }
+
+ /**
+ * Below method will be used to write the data to carbon data file
+ *
+ * @param holder
+ * @throws CarbonDataWriterException any problem in writing operation
+ */
+ @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ // size to calculate the size of the blocklet
+ int size = 0;
+ // get the blocklet info object
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+
+ List<DataChunk2> datachunks = null;
+ try {
+ // get all the data chunks
+ datachunks = CarbonMetadataUtil
+ .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+ }
+ // data chunk byte array
+ byte[][] dataChunkByteArray = new byte[datachunks.size()][];
+ for (int i = 0; i < dataChunkByteArray.length; i++) {
+ dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
+ // add the data chunk size
+ size += dataChunkByteArray[i].length;
+ }
+ // add row id index length
+ for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+ size += holder.getKeyBlockIndexLength()[i];
+ }
+ // add rle index length
+ for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+ size += holder.getDataIndexMapLength()[i];
+ }
+ // add dimension column data page and measure column data page size
+ long blockletDataSize =
+ holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+ // if size of the file already reached threshold size then create a new file and get the file
+ // channel object
+ updateBlockletFileChannel(blockletDataSize);
+ // writer the version header in the file if current file size is zero
+ // this is done so carbondata file can be read separately
+ try {
+ if (fileChannel.size() == 0) {
+ short version = Short.parseShort(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+ byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(header.length);
+ buffer.put(header);
+ buffer.rewind();
+ fileChannel.write(buffer);
+ }
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the file channel size", e);
+ }
+ // write data to file and get its offset
+ writeDataToFile(holder, dataChunkByteArray, fileChannel);
+ // add blocklet info to list
+ blockletInfoList.add(blockletInfo);
+ LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
+ }
+
+ /**
+ * Below method will be used to write the data to file
+ * Data Format
+ * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
+ * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
+ * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
+ * <MColumn1DataChunk><MColumn1DataPage>
+ * <MColumn2DataChunk><MColumn2DataPage>
+ * <MColumn2DataChunk><MColumn2DataPage>
+ *
+ * @param nodeHolder
+ * @param dataChunksBytes
+ * @param channel
+ * @throws CarbonDataWriterException
+ */
+ private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
+ throws CarbonDataWriterException {
+ long offset = 0;
+ try {
+ offset = channel.size();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the file channel size");
+ }
+ List<Long> currentDataChunksOffset = new ArrayList<>();
+ List<Short> currentDataChunksLength = new ArrayList<>();
+ dataChunksLength.add(currentDataChunksLength);
+ dataChunksOffsets.add(currentDataChunksOffset);
+ int bufferSize = 0;
+ int rowIdIndex = 0;
+ int rleIndex = 0;
+ for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add((short) dataChunksBytes[i].length);
+ bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
+ .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
+ dataWriterVo.getAggBlocks()[i] ?
+ nodeHolder.getCompressedDataIndex()[rleIndex].length :
+ 0);
+ offset += dataChunksBytes[i].length;
+ offset += nodeHolder.getKeyLengths()[i];
+ if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+ offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
+ rowIdIndex++;
+ }
+ if (dataWriterVo.getAggBlocks()[i]) {
+ offset += nodeHolder.getDataIndexMapLength()[rleIndex];
+ rleIndex++;
+ }
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ rleIndex = 0;
+ rowIdIndex = 0;
+ for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+ buffer.put(dataChunksBytes[i]);
+ buffer.put(nodeHolder.getKeyArray()[i]);
+ if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+ buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
+ buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+ if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
+ buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
+ }
+ rowIdIndex++;
+ }
+ if (dataWriterVo.getAggBlocks()[i]) {
+ buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+ rleIndex++;
+ }
+ }
+ try {
+ buffer.flip();
+ channel.write(buffer);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException(
+ "Problem while writing the dimension data in carbon data file", e);
+ }
+
+ int dataChunkIndex = nodeHolder.getKeyArray().length;
+ int totalLength = 0;
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length);
+ offset += dataChunksBytes[dataChunkIndex].length;
+ offset += nodeHolder.getDataArray()[i].length;
+ totalLength += dataChunksBytes[dataChunkIndex].length;
+ totalLength += nodeHolder.getDataArray()[i].length;
+ dataChunkIndex++;
+ }
+ buffer = ByteBuffer.allocate(totalLength);
+ dataChunkIndex = nodeHolder.getKeyArray().length;
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ buffer.put(dataChunksBytes[dataChunkIndex++]);
+ buffer.put(nodeHolder.getDataArray()[i]);
+ }
+ try {
+ buffer.flip();
+ channel.write(buffer);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException(
+ "Problem while writing the measure data in carbon data file", e);
+ }
+ }
+
+ /**
+ * This method will be used to get the blocklet metadata
+ *
+ * @return BlockletInfo - blocklet metadata
+ */
+ protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+ // create the info object for leaf entry
+ BlockletInfoColumnar info = new BlockletInfoColumnar();
+ //add aggBlocks array
+ info.setAggKeyBlock(nodeHolder.getAggBlocks());
+ // add total entry count
+ info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+ // add the key array length
+ info.setKeyLengths(nodeHolder.getKeyLengths());
+ // adding null measure index bit set
+ info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+ //add column min max length
+ info.setColumnMaxData(nodeHolder.getColumnMaxData());
+ info.setColumnMinData(nodeHolder.getColumnMinData());
+
+ // add measure length
+ info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+ info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+ info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+ info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+ // set startkey
+ info.setStartKey(nodeHolder.getStartKey());
+ // set end key
+ info.setEndKey(nodeHolder.getEndKey());
+ info.setCompressionModel(nodeHolder.getCompressionModel());
+ // return leaf metadata
+
+ //colGroup Blocks
+ info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+ return info;
+ }
+
+ /**
+ * This method will write metadata at the end of file file format in thrift format
+ */
+ protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ String filePath) throws CarbonDataWriterException {
+ try {
+ // get the current file position
+ long currentPosition = channel.size();
+ CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+ // get thrift file footer instance
+ FileFooter convertFileMeta = CarbonMetadataUtil
+ .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+ dataChunksOffsets, dataChunksLength);
+ // fill the carbon index details
+ fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ // write the footer
+ writer.writeFooter(convertFileMeta, currentPosition);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
index 259482e..8c2608b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
@@ -26,48 +26,23 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
-import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.mdkeygen.file.IFileManagerComposite;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFactDataWriter<int[]> {
- protected boolean[] aggBlocks;
- private NumberCompressor numberCompressor;
- private boolean[] isComplexType;
- private int numberOfNoDictionaryColumn;
- private boolean[] isDictionaryColumn;
- private static final LogService LOGGER = LogServiceFactory.getLogService(
- CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
+ private static final LogService LOGGER = LogServiceFactory
+ .getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
- public CarbonFactDataWriterImplForIntIndexAndAggBlock(String storeLocation, int measureCount,
- int mdKeyLength, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize,
- boolean[] aggBlocks, boolean[] isComplexType, int NoDictionaryCount,
- CarbonDataFileAttributes carbonDataFileAttributes, String databaseName,
- List<ColumnSchema> wrapperColumnSchemaList, int numberOfNoDictionaryColumn,
- boolean[] isDictionaryColumn, String carbonDataDirectoryPath, int[] colCardinality,
- SegmentProperties segmentProperties, int tableBlocksize) {
- super(storeLocation, measureCount, mdKeyLength, databaseName, tableName, fileManager,
- keyBlockSize, carbonDataFileAttributes, wrapperColumnSchemaList, carbonDataDirectoryPath,
- colCardinality, segmentProperties, tableBlocksize);
- this.isComplexType = isComplexType;
- this.databaseName = databaseName;
- this.numberOfNoDictionaryColumn = numberOfNoDictionaryColumn;
- this.isDictionaryColumn = isDictionaryColumn;
- this.aggBlocks = aggBlocks;
- this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
- CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
+ public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo dataWriterVo) {
+ super(dataWriterVo);
}
@Override
@@ -110,7 +85,7 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
}
totalKeySize += keyLengths[i];
- if (isComplexType[i] || isDictionaryColumn[i]) {
+ if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
allMinValue[i] = keyStorageArray[i].getMin();
allMaxValue[i] = keyStorageArray[i].getMax();
} else {
@@ -142,16 +117,16 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
}
}
int compressDataBlockSize = 0;
- for (int i = 0; i < aggBlocks.length; i++) {
- if (aggBlocks[i]) {
+ for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+ if (dataWriterVo.getAggBlocks()[i]) {
compressDataBlockSize++;
}
}
byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
int[] dataIndexMapLength = new int[compressDataBlockSize];
idx = 0;
- for (int i = 0; i < aggBlocks.length; i++) {
- if (aggBlocks[i]) {
+ for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+ if (dataWriterVo.getAggBlocks()[i]) {
try {
compressedDataIndex[idx] =
numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
@@ -163,13 +138,7 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
}
}
- byte[] writableKeyArray = new byte[totalKeySize];
- int startPosition = 0;
- for (int i = 0; i < keyLengths.length; i++) {
- System.arraycopy(keyBlockData[i], 0, writableKeyArray, startPosition, keyBlockData[i].length);
- startPosition += keyLengths[i];
- }
- int[] msrLength = new int[this.measureCount];
+ int[] msrLength = new int[dataWriterVo.getMeasureCount()];
// calculate the total size required for all the measure and get the
// each measure size
for (int i = 0; i < dataArray.length; i++) {
@@ -177,30 +146,9 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
totalMsrArrySize += currentMsrLenght;
msrLength[i] = currentMsrLenght;
}
- byte[] writableDataArray = new byte[totalMsrArrySize];
-
- // start position will be used for adding the measure in
- // writableDataArray after adding measure increment the start position
- // by added measure length which will be used for next measure start
- // position
- startPosition = 0;
- for (int i = 0; i < dataArray.length; i++) {
- System.arraycopy(dataArray[i], 0, writableDataArray, startPosition, dataArray[i].length);
- startPosition += msrLength[i];
- }
- // current file size;
- int indexBlockSize = 0;
- for (int i = 0; i < keyBlockIdxLengths.length; i++) {
- indexBlockSize += keyBlockIdxLengths[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
- }
-
- for (int i = 0; i < dataIndexMapLength.length; i++) {
- indexBlockSize += dataIndexMapLength[i];
- }
-
NodeHolder holder = new NodeHolder();
- holder.setDataArray(writableDataArray);
- holder.setKeyArray(writableKeyArray);
+ holder.setDataArray(dataArray);
+ holder.setKeyArray(keyBlockData);
// end key format will be <length of dictionary key><length of no
// dictionary key><DictionaryKey><No Dictionary key>
byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
@@ -235,10 +183,12 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
holder.setDataIndexMapLength(dataIndexMapLength);
holder.setCompressedDataIndex(compressedDataIndex);
holder.setCompressionModel(compressionModel);
+ holder.setTotalDimensionArrayLength(totalKeySize);
+ holder.setTotalMeasureArrayLength(totalMsrArrySize);
//setting column min max value
holder.setColumnMaxData(allMaxValue);
holder.setColumnMinData(allMinValue);
- holder.setAggBlocks(aggBlocks);
+ holder.setAggBlocks(dataWriterVo.getAggBlocks());
holder.setColGrpBlocks(colGrpBlock);
return holder;
}
@@ -252,113 +202,28 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
indexBlockSize += holder.getDataIndexMapLength()[i];
}
+
long blockletDataSize =
- holder.getKeyArray().length + holder.getDataArray().length + indexBlockSize;
+ holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
+ + indexBlockSize;
updateBlockletFileChannel(blockletDataSize);
- writeDataToFile(holder);
+ // write data to file and get its offset
+ long offset = writeDataToFile(holder, fileChannel);
+ // get the blocklet info for currently added blocklet
+ BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+ // add blocklet info to list
+ blockletInfoList.add(blockletInfo);
LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
}
/**
- * Below method will be used to update the min or max value
- * by removing the length from it
- *
- * @return min max value without length
- */
- private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
- ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
- byte[] actualValue = new byte[buffer.getShort()];
- buffer.get(actualValue);
- return actualValue;
- }
-
- /**
- * Below method will be used to update the no dictionary start and end key
- *
- * @param key key to be updated
- * @return return no dictionary key
- */
- private byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
- if (key.length == 0) {
- return key;
- }
- // add key to byte buffer remove the length part of the data
- ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
- // create a output buffer without length
- ByteBuffer output = ByteBuffer.allocate(key.length - 2);
- short numberOfByteToStorLength = 2;
- // as length part is removed, so each no dictionary value index
- // needs to be reshuffled by 2 bytes
- for (int i = 0; i < numberOfNoDictionaryColumn; i++) {
- output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
- }
- // copy the data part
- while (buffer.hasRemaining()) {
- output.put(buffer.get());
- }
- output.rewind();
- return output.array();
- }
-
- protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray,
- int entryCount) {
- byte[][] keyBlockData = new byte[keyStorageArray.length][];
- int destPos = 0;
- int keyBlockSizePosition = -1;
- for (int i = 0; i < keyStorageArray.length; i++) {
- destPos = 0;
- //handling for high card dims
- if (!isComplexType[i] && !this.isDictionaryColumn[i]) {
- int totalLength = 0;
- // calc size of the total bytes in all the colmns.
- for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
- byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
- totalLength += colValue.length;
- }
- keyBlockData[i] = new byte[totalLength];
-
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- int length = keyStorageArray[i].getKeyBlock()[j].length;
- System
- .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length);
- destPos += length;
- }
- } else {
- keyBlockSizePosition++;
- if (aggBlocks[i]) {
- keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
- keyStorageArray[i].getKeyBlock()[j].length);
- destPos += keyStorageArray[i].getKeyBlock()[j].length;
- }
- } else {
- if (isComplexType[i]) {
- keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length
- * keyBlockSize[keyBlockSizePosition]];
- } else {
- keyBlockData[i] = new byte[entryCount * keyBlockSize[keyBlockSizePosition]];
- }
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
- keyBlockSize[keyBlockSizePosition]);
- destPos += keyBlockSize[keyBlockSizePosition];
- }
- }
- }
- keyBlockData[i] = SnappyByteCompression.INSTANCE.compress(keyBlockData[i]);
- }
- return keyBlockData;
- }
-
- /**
* This method is responsible for writing blocklet to the data file
*
* @return file offset offset is the current position of the file
* @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing
* goes wrong while while writing the leaf file
*/
- protected long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
+ private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
throws CarbonDataWriterException {
// create byte buffer
byte[][] compressedIndex = nodeHolder.getCompressedIndex();
@@ -375,15 +240,20 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
}
ByteBuffer byteBuffer = ByteBuffer.allocate(
- nodeHolder.getKeyArray().length + nodeHolder.getDataArray().length + indexBlockSize);
+ nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength()
+ + indexBlockSize);
long offset = 0;
try {
// get the current offset
offset = channel.size();
// add key array to byte buffer
- byteBuffer.put(nodeHolder.getKeyArray());
+ for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
+ byteBuffer.put(nodeHolder.getKeyArray()[i]);
+ }
+ for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+ byteBuffer.put(nodeHolder.getDataArray()[i]);
+ }
// add measure data array to byte buffer
- byteBuffer.put(nodeHolder.getDataArray());
ByteBuffer buffer1 = null;
for (int i = 0; i < compressedIndex.length; i++) {
@@ -448,9 +318,9 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
// add measure length
info.setMeasureLength(nodeHolder.getMeasureLenght());
- long[] msrOffset = new long[this.measureCount];
+ long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
- for (int i = 0; i < this.measureCount; i++) {
+ for (int i = 0; i < msrOffset.length; i++) {
// increment the current offset by 4 bytes because 4 bytes will be
// used for measure byte length
// offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
@@ -489,4 +359,21 @@ public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFact
return info;
}
+ /**
+ * This method will write metadata at the end of file file format in thrift format
+ */
+ protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ String filePath) throws CarbonDataWriterException {
+ try {
+ long currentPosition = channel.size();
+ CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+ FileFooter convertFileMeta = CarbonMetadataUtil
+ .convertFileFooter(infoList, localCardinality.length, localCardinality,
+ thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+ fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ writer.writeFooter(convertFileMeta, currentPosition);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
index aa758e6..a7d14f0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
@@ -27,12 +27,12 @@ public class NodeHolder {
/**
* keyArray
*/
- private byte[] keyArray;
+ private byte[][] keyArray;
/**
* dataArray
*/
- private byte[] dataArray;
+ private byte[][] dataArray;
/**
* measureLenght
@@ -139,30 +139,40 @@ public class NodeHolder {
private BitSet[] measureNullValueIndex;
/**
+ * total length of dimension values
+ */
+ private int totalDimensionArrayLength;
+
+ /**
+ * total length of all measure values
+ */
+ private int totalMeasureArrayLength;
+
+ /**
* @return the keyArray
*/
- public byte[] getKeyArray() {
+ public byte[][] getKeyArray() {
return keyArray;
}
/**
* @param keyArray the keyArray to set
*/
- public void setKeyArray(byte[] keyArray) {
+ public void setKeyArray(byte[][] keyArray) {
this.keyArray = keyArray;
}
/**
* @return the dataArray
*/
- public byte[] getDataArray() {
+ public byte[][] getDataArray() {
return dataArray;
}
/**
* @param dataArray the dataArray to set
*/
- public void setDataArray(byte[] dataArray) {
+ public void setDataArray(byte[][] dataArray) {
this.dataArray = dataArray;
}
@@ -453,4 +463,20 @@ public class NodeHolder {
public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
this.measureNullValueIndex = measureNullValueIndex;
}
+
+ public int getTotalDimensionArrayLength() {
+ return totalDimensionArrayLength;
+ }
+
+ public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+ this.totalDimensionArrayLength = totalDimensionArrayLength;
+ }
+
+ public int getTotalMeasureArrayLength() {
+ return totalMeasureArrayLength;
+ }
+
+ public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+ this.totalMeasureArrayLength = totalMeasureArrayLength;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 62b442f..84192b8 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -35,9 +35,12 @@ import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.test.util.StoreCreator;
import junit.framework.TestCase;
+
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -45,10 +48,22 @@ public class BlockIndexStoreTest extends TestCase {
private BlockIndexStore indexStore;
+ private String property;
@BeforeClass public void setUp() {
+ property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
StoreCreator.createCarbonStore();
indexStore = BlockIndexStore.getInstance();
}
+
+ @AfterClass public void tearDown() {
+ if(null!=property) {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, property);
+ }else {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+"");
+ }
+ }
@Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException {
String canonicalPath =
@@ -56,7 +71,7 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
+ file.length(),(short)1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -78,20 +93,20 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info1 =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info2 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info3 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info4 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
@@ -133,31 +148,31 @@ public class BlockIndexStoreTest extends TestCase {
File file = getPartFile();
TableBlockInfo info =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info1 =
new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info2 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info3 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info4 =
new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info5 =
new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
- file.length());
+ file.length(),(short)1);
TableBlockInfo info6 =
new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
TableBlockInfo info7 =
new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
- file.length());
+ file.length(), (short)1);
CarbonTableIdentifier carbonTableIdentifier =
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
|