http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
deleted file mode 100644
index 703ad1f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal28Reader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal38Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal28Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal38Reader;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.ConvertedType;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-public class ParquetRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
-
- // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
- private static final int NUMBER_OF_VECTORS = 1;
- private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
- private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
- private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
-
- // TODO - should probably find a smarter way to set this, currently 1 megabyte
- private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
- public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
- private static final String SEPERATOR = System.getProperty("file.separator");
-
-
- // used for clearing the last n bits of a byte
- public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
- // used for clearing the first n bits of a byte
- public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
-
- private int bitWidthAllFixedFields;
- private boolean allFieldsFixedLength;
- private int recordsPerBatch;
- private long totalRecords;
- private long rowGroupOffset;
-
- private List<ColumnReader> columnStatuses;
- FileSystem fileSystem;
- private long batchSize;
- Path hadoopPath;
- private VarLenBinaryReader varLengthReader;
- private ParquetMetadata footer;
- private List<SchemaPath> columns;
-
- public CodecFactoryExposer getCodecFactoryExposer() {
- return codecFactoryExposer;
- }
-
- private final CodecFactoryExposer codecFactoryExposer;
-
- int rowGroupIndex;
-
- public ParquetRecordReader(FragmentContext fragmentContext, //
- String path, //
- int rowGroupIndex, //
- FileSystem fs, //
- CodecFactoryExposer codecFactoryExposer, //
- ParquetMetadata footer, //
- List<SchemaPath> columns) throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
- columns);
- }
-
- public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
- String path, int rowGroupIndex, FileSystem fs,
- CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
- List<SchemaPath> columns) throws ExecutionSetupException {
- hadoopPath = new Path(path);
- fileSystem = fs;
- this.codecFactoryExposer = codecFactoryExposer;
- this.rowGroupIndex = rowGroupIndex;
- this.batchSize = batchSize;
- this.footer = footer;
- this.columns = columns;
- }
-
- public int getRowGroupIndex() {
- return rowGroupIndex;
- }
-
- public int getBitWidthAllFixedFields() {
- return bitWidthAllFixedFields;
- }
-
- public long getBatchSize() {
- return batchSize;
- }
-
- /**
- * @param type a fixed length type from the parquet library enum
- * @return the length in pageDataByteArray of the type
- */
- public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
- switch (type) {
- case INT64: return 64;
- case INT32: return 32;
- case BOOLEAN: return 1;
- case FLOAT: return 32;
- case DOUBLE: return 64;
- case INT96: return 96;
- // binary and fixed length byte array
- default:
- throw new IllegalStateException("Length cannot be determined for type " + type);
- }
- }
-
- private boolean fieldSelected(MaterializedField field){
- // TODO - not sure if this is how we want to represent this
- // for now it makes the existing tests pass, simply selecting
- // all available data if no columns are provided
- if (this.columns != null){
- for (SchemaPath expr : this.columns){
- if ( field.matches(expr)){
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
-
- columnStatuses = new ArrayList<>();
- totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
- List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
- allFieldsFixedLength = true;
- ColumnDescriptor column;
- ColumnChunkMetaData columnChunkMetaData;
- int columnsToScan = 0;
-
- MaterializedField field;
- ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
- FileMetaData fileMetaData;
-
- // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
- // store a map from column name to converted types if they are non-null
- HashMap<String, SchemaElement> schemaElements = new HashMap<>();
- fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
- for (SchemaElement se : fileMetaData.getSchema()) {
- schemaElements.put(se.getName(), se);
- }
-
- // loop to add up the length of the fixed width columns and build the schema
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- logger.debug("name: " + fileMetaData.getSchema().get(i).name);
- SchemaElement se = schemaElements.get(column.getPath()[0]);
- MajorType mt = toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
- field = MaterializedField.create(toFieldName(column.getPath()),mt);
- if ( ! fieldSelected(field)){
- continue;
- }
- columnsToScan++;
- // sum the lengths of all of the fixed length fields
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
- // TODO - implement this when the feature is added upstream
- if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
- bitWidthAllFixedFields += se.getType_length() * 8;
- } else {
- bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
- }
- } else {
- allFieldsFixedLength = false;
- }
- }
- rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
-
- // none of the columns in the parquet file matched the request columns from the query
- if (columnsToScan == 0){
- throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
- }
- if (allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
- footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
- }
- else {
- recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
- }
-
- try {
- ValueVector v;
- ConvertedType convertedType;
- SchemaElement schemaElement;
- ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
- ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
- // initialize all of the column read status objects
- boolean fieldFixedLength = false;
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
- schemaElement = schemaElements.get(column.getPath()[0]);
- convertedType = schemaElement.getConverted_type();
- MajorType type = toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
- field = MaterializedField.create(toFieldName(column.getPath()), type);
- // the field was not requested to be read
- if ( ! fieldSelected(field)) continue;
-
- fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
- schemaElement);
- } else {
- // create a reader and add it to the appropriate list
- getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement, varLengthColumns, nullableVarLengthColumns);
- }
- }
- varLengthReader = new VarLenBinaryReader(this, varLengthColumns, nullableVarLengthColumns);
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- private SchemaPath toFieldName(String[] paths) {
- return SchemaPath.getCompoundPath(paths);
- }
-
- private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
- if (column.getMaxDefinitionLevel() == 0) {
- return TypeProtos.DataMode.REQUIRED;
- } else {
- return TypeProtos.DataMode.OPTIONAL;
- }
- }
-
- private void resetBatch() {
- for (ColumnReader column : columnStatuses) {
- column.valuesReadInCurrentPass = 0;
- }
- for (VarLengthColumn r : varLengthReader.columns){
- r.valuesReadInCurrentPass = 0;
- }
- for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
- r.valuesReadInCurrentPass = 0;
- }
- }
-
- /**
- * @param fixedLength
- * @param descriptor
- * @param columnChunkMetaData
- * @param allocateSize - the size of the vector to create
- * @return
- * @throws SchemaChangeException
- */
- private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
- SchemaElement schemaElement)
- throws SchemaChangeException, ExecutionSetupException {
- ConvertedType convertedType = schemaElement.getConverted_type();
- // if the column is required
- if (descriptor.getMaxDefinitionLevel() == 0){
- if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
- columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
- int length = schemaElement.type_length;
- if (length <= 12) {
- columnStatuses.add(new Decimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (length <= 16) {
- columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- }
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
- columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else{
- if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else {
- columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- }
- }
- return true;
- }
- else { // if the column is nullable
- if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
- columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
- columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
- int length = schemaElement.type_length;
- if (length <= 12) {
- columnStatuses.add(new NullableDecimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (length <= 16) {
- columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- }
- } else {
- columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this, allocateSize, descriptor,
- columnChunkMetaData, fixedLength, v, schemaElement));
- }
- return true;
- }
- }
-
- public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException {
-
- for (ColumnReader crs : columnStatuses){
- crs.readAllFixedFields(recordsToRead, firstColumnStatus);
- }
- }
-
- @Override
- public int next() {
- resetBatch();
- long recordsToRead = 0;
- try {
- ColumnReader firstColumnStatus;
- if (columnStatuses.size() > 0){
- firstColumnStatus = columnStatuses.iterator().next();
- }
- else{
- if (varLengthReader.columns.size() > 0){
- firstColumnStatus = varLengthReader.columns.iterator().next();
- }
- else{
- firstColumnStatus = varLengthReader.nullableColumns.iterator().next();
- }
- }
-
- if (allFieldsFixedLength) {
- recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
- } else {
- recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
-
- // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
- // get too complicated
-
- //loop through variable length data to find the maximum records that will fit in this batch
- // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable
- // length values...
- // jacques believes that variable length fields will be encoded as |length|value|length|value|...
- // cannot find more information on this right now, will keep looking
- }
-
-// logger.debug("records to read in this pass: {}", recordsToRead);
- if (allFieldsFixedLength) {
- readAllFixedFields(recordsToRead, firstColumnStatus);
- } else { // variable length columns
- long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
- readAllFixedFields(fixedRecordsToRead, firstColumnStatus);
- }
-
- return firstColumnStatus.valuesReadInCurrentPass;
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
- }
- }
-
- static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
- TypeProtos.DataMode mode, SchemaElement schemaElement) {
- return toMajorType(primitiveTypeName, 0, mode, schemaElement);
- }
-
- // TODO - move this into ParquetTypeHelper and use code generation to create the list
- static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
- TypeProtos.DataMode mode, SchemaElement schemaElement) {
- ConvertedType convertedType = schemaElement.getConverted_type();
- switch (mode) {
-
- case OPTIONAL:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.VARBINARY);
- }
- switch (convertedType) {
- case UTF8:
- return Types.optional(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.optional(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.optional(MinorType.DATE);
- case TIME:
- return Types.optional(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.optional(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.optional(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.optional(TypeProtos.MinorType.FLOAT8);
- // TODO - Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- case REQUIRED:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.required(TypeProtos.MinorType.VARBINARY);
- }
- switch (convertedType) {
- case UTF8:
- return Types.required(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.required(MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.required(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.required(MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.required(MinorType.DATE);
- case TIME:
- return Types.required(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.required(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.required(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.required(TypeProtos.MinorType.FLOAT8);
- // Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- case REPEATED:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.repeated(TypeProtos.MinorType.VARBINARY);
- }
- switch (schemaElement.getConverted_type()) {
- case UTF8:
- return Types.repeated(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.repeated(MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.repeated(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.repeated(MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.repeated(MinorType.DATE);
- case TIME:
- return Types.repeated(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.repeated(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.repeated(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.repeated(TypeProtos.MinorType.FLOAT8);
- // Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- }
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode);
- }
-
- private static void getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
- SchemaElement schemaElement, List<VarLengthColumn> varLengthColumns,
- List<NullableVarLengthColumn> nullableVarLengthColumns) throws ExecutionSetupException {
- ConvertedType convertedType = schemaElement.getConverted_type();
- switch (descriptor.getMaxDefinitionLevel()) {
- case 0:
- if (convertedType == null) {
- varLengthColumns.add(new VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement));
- return;
- }
- switch (convertedType) {
- case UTF8:
- varLengthColumns.add(new VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement));
- return;
- case DECIMAL:
- if (v instanceof Decimal28SparseVector) {
- varLengthColumns.add(new Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement));
- return;
- } else if (v instanceof Decimal38SparseVector) {
- varLengthColumns.add(new Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement));
- return;
- }
- default:
- }
- default:
- if (convertedType == null) {
- nullableVarLengthColumns.add(new NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement));
- return;
- }
- switch (convertedType) {
- case UTF8:
- nullableVarLengthColumns.add(new NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement));
- return;
- case DECIMAL:
- if (v instanceof NullableDecimal28SparseVector) {
- nullableVarLengthColumns.add(new NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement));
- return;
- } else if (v instanceof NullableDecimal38SparseVector) {
- nullableVarLengthColumns.add(new NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement));
- return;
- }
- default:
- }
- }
- throw new UnsupportedOperationException();
- }
-
- private static MinorType getDecimalType(SchemaElement schemaElement) {
- return schemaElement.getPrecision() <= 28 ? MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
- }
-
- static String join(String delimiter, String... str) {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for (String s : str) {
- builder.append(s);
- if (i < str.length) {
- builder.append(delimiter);
- }
- i++;
- }
- return builder.toString();
- }
-
- @Override
- public void cleanup() {
- for (ColumnReader column : columnStatuses) {
- column.clear();
- }
- columnStatuses.clear();
-
- for (VarLengthColumn r : varLengthReader.columns){
- r.clear();
- }
- for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
- r.clear();
- }
- varLengthReader.columns.clear();
- varLengthReader.nullableColumns.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index b26f688..a336316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -52,6 +52,7 @@ import static java.lang.Math.min;
import static java.lang.String.format;
public class ParquetRecordWriter extends ParquetOutputRecordWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -147,7 +148,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = store.memSize();
if (memSize > blockSize) {
- System.out.println("Reached block size " + blockSize);
+ logger.debug("Reached block size " + blockSize);
flush();
newSchema();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index df6581f..b4f02fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.drill.exec.store.RecordReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
deleted file mode 100644
index 813a799..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-
-import parquet.bytes.BytesUtils;
-
-public class VarLenBinaryReader {
-
- ParquetRecordReader parentReader;
- final List<VarLengthColumn> columns;
- final List<NullableVarLengthColumn> nullableColumns;
-
- public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns,
- List<NullableVarLengthColumn> nullableColumns){
- this.parentReader = parentReader;
- this.nullableColumns = nullableColumns;
- this.columns = columns;
- }
-
- /**
- * Reads as many variable length values as possible.
- *
- * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
- * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
- * @return - the number of fixed length fields that will fit in the batch
- * @throws IOException
- */
- public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
-
- long recordsReadInCurrentPass = 0;
- int lengthVarFieldsInCurrentRecord;
- boolean rowGroupFinished = false;
- byte[] bytes;
- // write the first 0 offset
- for (ColumnReader columnReader : columns) {
- columnReader.bytesReadInCurrentPass = 0;
- columnReader.valuesReadInCurrentPass = 0;
- }
- // same for the nullable columns
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- columnReader.bytesReadInCurrentPass = 0;
- columnReader.valuesReadInCurrentPass = 0;
- columnReader.nullsRead = 0;
- }
- outer: do {
- lengthVarFieldsInCurrentRecord = 0;
- for (VarLengthColumn columnReader : columns) {
- if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
- rowGroupFinished = true;
- break;
- }
- if (columnReader.pageReadStatus.currentPage == null
- || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
- if (!columnReader.pageReadStatus.next()) {
- rowGroupFinished = true;
- break;
- }
- }
- bytes = columnReader.pageReadStatus.pageDataByteArray;
-
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
- (int) columnReader.pageReadStatus.readPosInBytes);
- lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
- break outer;
- }
-
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- // check to make sure there is capacity for the next value (for nullables this is a check to see if there is
- // still space in the nullability recording vector)
- if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
- rowGroupFinished = true;
- break;
- }
- if (columnReader.pageReadStatus.currentPage == null
- || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- if (!columnReader.pageReadStatus.next()) {
- rowGroupFinished = true;
- break;
- } else {
- columnReader.currDictVal = null;
- }
- }
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // we need to read all of the lengths to determine if this value will fit in the current vector,
- // as we can only read each definition level once, we have to store the last one as we will need it
- // at the start of the next read if we decide after reading all of the varlength values in this record
- // that it will not fit in this batch
- if ( columnReader.currDefLevel == -1 ) {
- columnReader.currDefLevel = columnReader.pageReadStatus.definitionLevels.readInteger();
- }
- if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > columnReader.currDefLevel){
- columnReader.currentValNull = true;
- columnReader.dataTypeLengthInBits = 0;
- columnReader.nullsRead++;
- continue;// field is null, no length to add to data vector
- }
-
- if (columnReader.usingDictionary) {
- if (columnReader.currDictVal == null) {
- columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes();
- }
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = columnReader.currDictVal.length();
- }
- else {
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
- (int) columnReader.pageReadStatus.readPosInBytes);
- }
- lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
- break outer;
- }
- }
- // check that the next record will fit in the batch
- if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord
- > parentReader.getBatchSize()){
- break outer;
- }
- for (VarLengthColumn columnReader : columns) {
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
- boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
- assert success;
- columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
- columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
- columnReader.pageReadStatus.valuesRead++;
- columnReader.valuesReadInCurrentPass++;
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
- if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){
- boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
- assert success;
- }
- columnReader.currentValNull = false;
- columnReader.currDefLevel = -1;
- if (columnReader.dataTypeLengthInBits > 0){
- columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
- columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
- }
- columnReader.pageReadStatus.valuesRead++;
- columnReader.valuesReadInCurrentPass++;
- if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
- columnReader.pageReadStatus.next();
- }
- columnReader.currDictVal = null;
- }
- recordsReadInCurrentPass++;
- } while (recordsReadInCurrentPass < recordsToReadInThisPass);
- for (VarLengthColumn columnReader : columns) {
- columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
- }
- return recordsReadInCurrentPass;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
deleted file mode 100644
index 56f687c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store.parquet;
-
-import java.math.BigDecimal;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
-import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.io.api.Binary;
-
-public class VarLengthColumnReaders {
-
- public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
-
- Binary currDictVal;
-
- VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- usingDictionary = true;
- }
- else {
- usingDictionary = false;
- }
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
-
- public abstract int capacity();
-
- }
-
- public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader {
-
- int nullsRead;
- boolean currentValNull = false;
- Binary currDictVal;
-
- NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- public abstract boolean setSafe(int index, byte[] value, int start, int length);
-
- public abstract int capacity();
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
- }
-
- public static class Decimal28Column extends VarLengthColumn<Decimal28SparseVector> {
-
- protected Decimal28SparseVector decimal28Vector;
-
- Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- this.decimal28Vector = v;
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- int width = Decimal28SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
- if (index >= decimal28Vector.getValueCapacity()) {
- return false;
- }
- DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
- return true;
- }
-
- @Override
- public int capacity() {
- return decimal28Vector.getData().capacity();
- }
- }
-
- public static class NullableDecimal28Column extends NullableVarLengthColumn<NullableDecimal28SparseVector> {
-
- protected NullableDecimal28SparseVector nullableDecimal28Vector;
-
- NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- nullableDecimal28Vector = v;
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- int width = Decimal28SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
- if (index >= nullableDecimal28Vector.getValueCapacity()) {
- return false;
- }
- DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
- nullableDecimal28Vector.getMutator().setIndexDefined(index);
- return true;
- }
-
- @Override
- public int capacity() {
- return nullableDecimal28Vector.getData().capacity();
- }
- }
-
- public static class Decimal38Column extends VarLengthColumn<Decimal38SparseVector> {
-
- protected Decimal38SparseVector decimal28Vector;
-
- Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- decimal28Vector = v;
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- int width = Decimal38SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
- if (index >= decimal28Vector.getValueCapacity()) {
- return false;
- }
- DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
- return true;
- }
-
- @Override
- public int capacity() {
- return decimal28Vector.getData().capacity();
- }
- }
-
- public static class NullableDecimal38Column extends NullableVarLengthColumn<NullableDecimal38SparseVector> {
-
- protected NullableDecimal38SparseVector nullableDecimal38Vector;
-
- NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- nullableDecimal38Vector = v;
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- int width = Decimal38SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
- if (index >= nullableDecimal38Vector.getValueCapacity()) {
- return false;
- }
- DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
- nullableDecimal38Vector.getMutator().setIndexDefined(index);
- return true;
- }
-
- @Override
- public int capacity() {
- return nullableDecimal38Vector.getData().capacity();
- }
- }
-
-
- public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
-
- // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
- protected VarCharVector varCharVector;
-
- VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- varCharVector = v;
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- boolean success;
- if(index >= varCharVector.getValueCapacity()) return false;
-
- if (usingDictionary) {
- success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
- }
- else {
- success = varCharVector.getMutator().setSafe(index, bytes, start, length);
- }
- return success;
- }
-
- @Override
- public int capacity() {
- return varCharVector.getData().capacity();
- }
- }
-
- public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> {
-
- int nullsRead;
- boolean currentValNull = false;
- // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
- protected NullableVarCharVector nullableVarCharVector;
-
- NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- nullableVarCharVector = v;
- }
-
- public boolean setSafe(int index, byte[] value, int start, int length) {
- boolean success;
- if(index >= nullableVarCharVector.getValueCapacity()) return false;
-
- if (usingDictionary) {
- success = nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
- }
- else {
- success = nullableVarCharVector.getMutator().setSafe(index, value, start, length);
- }
- return success;
- }
-
- @Override
- public int capacity() {
- return nullableVarCharVector.getData().capacity();
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
- }
-
- public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> {
-
- // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
- protected VarBinaryVector varBinaryVector;
-
- VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- varBinaryVector = v;
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean setSafe(int index, byte[] bytes, int start, int length) {
- boolean success;
- if(index >= varBinaryVector.getValueCapacity()) return false;
-
- if (usingDictionary) {
- success = varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
- }
- else {
- success = varBinaryVector.getMutator().setSafe(index, bytes, start, length);
- }
- return success;
- }
-
- @Override
- public int capacity() {
- return varBinaryVector.getData().capacity();
- }
- }
-
- public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> {
-
- int nullsRead;
- boolean currentValNull = false;
- // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
- protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector;
-
- NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- nullableVarBinaryVector = v;
- }
-
- public boolean setSafe(int index, byte[] value, int start, int length) {
- boolean success;
- if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
-
- if (usingDictionary) {
- success = nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
- }
- else {
- success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length);
- }
- return success;
- }
-
- @Override
- public int capacity() {
- return nullableVarBinaryVector.getData().capacity();
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
new file mode 100644
index 0000000..2c6e488
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+final class BitReader extends ColumnReader {
+
+ private byte currentByte;
+ private byte nextByte;
+ private byte[] bytes;
+
+ BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ readStartInBytes = pageReader.readPosInBytes;
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+ bytes = pageReader.pageDataByteArray;
+ // standard read, using memory mapping
+ if (pageReader.bitShift == 0) {
+ ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
+ (int) readStartInBytes, (int) readLength);
+ } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
+
+ vectorData = ((BaseDataValueVector) valueVec).getData();
+ nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)];
+ readLengthInBits = recordsReadInThisIteration + pageReader.bitShift;
+
+ int i = 0;
+ // read individual bytes with appropriate shifting
+ for (; i < (int) readLength; i++) {
+ currentByte = nextByte;
+ currentByte = (byte) (currentByte >>> pageReader.bitShift);
+ // mask the bits about to be added from the next byte
+ currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReader.bitShift - 1]);
+ // if we are not on the last byte
+ if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < pageReader.byteLength) {
+ // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
+ nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i];
+ currentByte = (byte) (currentByte | nextByte
+ << (8 - pageReader.bitShift)
+ & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]);
+ }
+ vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
+ }
+ vectorData.setIndex(0, (valuesReadInCurrentPass / 8)
+ + (int) readLength - 1);
+ vectorData.capacity(vectorData.writerIndex() + 1);
+ }
+
+ // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
+ // shifted by to read all of the values into the vector without leaving space
+ if (readLengthInBits % 8 != 0) {
+ pageReader.bitShift = (int) readLengthInBits % 8;
+ } else {
+ pageReader.bitShift = 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
new file mode 100644
index 0000000..fd672d6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.IOException;
+
+public abstract class ColumnReader<V extends ValueVector> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+
+ final ParquetRecordReader parentReader;
+
+ // Value Vector for this column
+ final V valueVec;
+
+ ColumnDescriptor getColumnDescriptor() {
+ return columnDescriptor;
+ }
+
+ // column description from the parquet library
+ final ColumnDescriptor columnDescriptor;
+ // metadata of the column, from the parquet library
+ final ColumnChunkMetaData columnChunkMetaData;
+ // status information on the current page
+ PageReader pageReader;
+
+ final SchemaElement schemaElement;
+ boolean usingDictionary;
+
+ // quick reference to see if the field is fixed length (as this requires an instanceof)
+ final boolean isFixedLength;
+
+ // counter for the total number of values read from one or more pages
+ // when a batch is filled all of these values should be the same for all of the columns
+ int totalValuesRead;
+
+ // counter for the values that have been read in this pass (a single call to the next() method)
+ int valuesReadInCurrentPass;
+
+ // length of single data value in bits, if the length is fixed
+ int dataTypeLengthInBits;
+ int bytesReadInCurrentPass;
+
+ protected ByteBuf vectorData;
+ // when reading definition levels for nullable columns, it is a one-way stream of integers
+ // when reading var length data, where we don't know if all of the records will fit until we've read all of them
+ // we must store the last definition level an use it in at the start of the next batch
+ int currDefLevel;
+
+ // variables for a single read pass
+ long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
+
+ protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
+ this.parentReader = parentReader;
+ this.columnDescriptor = descriptor;
+ this.columnChunkMetaData = columnChunkMetaData;
+ this.isFixedLength = fixedLength;
+ this.schemaElement = schemaElement;
+ this.valueVec = v;
+ this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData);
+
+ if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
+ } else {
+ dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
+ }
+ }
+
+ }
+
+ public int getRecordsReadInCurrentPass() {
+ return valuesReadInCurrentPass;
+ }
+
+ public void processPages(long recordsToReadInThisPass) throws IOException {
+ reset();
+ do {
+ determineSize(recordsToReadInThisPass, 0);
+
+ } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+ valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
+ }
+
+ public void clear() {
+ valueVec.clear();
+ this.pageReader.clear();
+ }
+
+ public void readValues(long recordsToRead) {
+ readField(recordsToRead);
+
+ valuesReadInCurrentPass += recordsReadInThisIteration;
+ totalValuesRead += recordsReadInThisIteration;
+ pageReader.valuesRead += recordsReadInThisIteration;
+ pageReader.readPosInBytes = readStartInBytes + readLength;
+ }
+
+ protected abstract void readField(long recordsToRead);
+
+ /**
+ * Determines the size of a single value in a variable column.
+ *
+ * Return value indicates if we have finished a row group and should stop reading
+ *
+ * @param recordsReadInCurrentPass
+ * @param lengthVarFieldsInCurrentRecord
+ * @return - true if we should stop reading
+ * @throws IOException
+ */
+ public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord) throws IOException {
+
+ boolean doneReading = readPage();
+ if (doneReading)
+ return true;
+
+ doneReading = processPageData((int) recordsReadInCurrentPass);
+ if (doneReading)
+ return true;
+
+ lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+
+ doneReading = checkVectorCapacityReached();
+ if (doneReading)
+ return true;
+
+ return false;
+ }
+
+ protected void readRecords(int recordsToRead) {
+ for (int i = 0; i < recordsToRead; i++) {
+ readField(i);
+ }
+ pageReader.valuesRead += recordsToRead;
+ }
+
+ protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
+ readValues(recordsToReadInThisPass);
+ return true;
+ }
+
+ public void updatePosition() {}
+
+ public void updateReadyToReadPosition() {}
+
+ public void reset() {
+ readStartInBytes = 0;
+ readLength = 0;
+ readLengthInBits = 0;
+ recordsReadInThisIteration = 0;
+ bytesReadInCurrentPass = 0;
+ vectorData = ((BaseValueVector) valueVec).getData();
+ }
+
+ public int capacity() {
+ return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0);
+ }
+
+ // Read a page if we need more data, returns true if we need to exit the read loop
+ public boolean readPage() throws IOException {
+ if (pageReader.currentPage == null
+ || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) {
+ readRecords(pageReader.valuesReadyToRead);
+ if (pageReader.currentPage != null)
+ totalValuesRead += pageReader.currentPage.getValueCount();
+ if (!pageReader.next()) {
+ hitRowGroupEnd();
+ return true;
+ }
+ postPageRead();
+ }
+ return false;
+ }
+
+ protected int totalValuesReadAndReadyToReadInPage() {
+ return pageReader.valuesRead + pageReader.valuesReadyToRead;
+ }
+
+ protected void postPageRead() {
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected void hitRowGroupEnd() {}
+
+ protected boolean checkVectorCapacityReached() {
+ if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) {
+ logger.debug("Reached the capacity of the data vector in a variable length value vector.");
+ return true;
+ }
+ else if (valuesReadInCurrentPass > valueVec.getValueCapacity()){
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
new file mode 100644
index 0000000..243744e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -0,0 +1,175 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ColumnReaderFactory {
+
+ /**
+ * @param fixedLength
+ * @param descriptor
+ * @param columnChunkMetaData
+ * @param allocateSize - the size of the vector to create
+ * @return
+ * @throws SchemaChangeException
+ */
+ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+ SchemaElement schemaElement)
+ throws Exception {
+ ConvertedType convertedType = schemaElement.getConverted_type();
+ // if the column is required, or repeated (in which case we just want to use this to generate our appropriate
+ // ColumnReader for actually transferring data into the data vector inside of our repeated vector
+ if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0){
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+ int length = schemaElement.type_length;
+ if (length <= 12) {
+ return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (length <= 16) {
+ return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else{
+ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else {
+ return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ }
+ }
+ }
+ else { // if the column is nullable
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+ int length = schemaElement.type_length;
+ if (length <= 12) {
+ return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (length <= 16) {
+ return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ } else {
+ return getNullableColumnReader(recordReader, allocateSize, descriptor,
+ columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ }
+ throw new Exception("Unexpected parquet metadata configuration.");
+ }
+
+ static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
+ SchemaElement schemaElement
+ ) throws ExecutionSetupException {
+ ConvertedType convertedType = schemaElement.getConverted_type();
+ switch (descriptor.getMaxDefinitionLevel()) {
+ case 0:
+ if (convertedType == null) {
+ return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
+ case DECIMAL:
+ if (v instanceof Decimal28SparseVector) {
+ return new VarLengthColumnReaders.Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement);
+ } else if (v instanceof Decimal38SparseVector) {
+ return new VarLengthColumnReaders.Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement);
+ }
+ default:
+ }
+ default:
+ if (convertedType == null) {
+ return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
+ case DECIMAL:
+ if (v instanceof NullableDecimal28SparseVector) {
+ return new VarLengthColumnReaders.NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement);
+ } else if (v instanceof NullableDecimal38SparseVector) {
+ return new VarLengthColumnReaders.NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement);
+ }
+ default:
+ }
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+ ColumnDescriptor columnDescriptor,
+ ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength,
+ ValueVector valueVec,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, valueVec, schemaElement);
+ } else {
+ if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableIntVector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+ }
+ else{
+ throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
+ }
+ }
+ }
+}
|