http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
new file mode 100644
index 0000000..4513aaa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.DateVector;
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTimeUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.math.BigDecimal;
+
+class FixedByteAlignedReader extends ColumnReader {
+
+ protected byte[] bytes;
+
+
+ FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ readStartInBytes = pageReader.readPosInBytes;
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+ bytes = pageReader.pageDataByteArray;
+ // vectorData is assigned by the superclass read loop method
+ writeData();
+ }
+
+ protected void writeData() {
+ vectorData.writeBytes(bytes,
+ (int) readStartInBytes, (int) readLength);
+ }
+
+ public static abstract class ConvertedReader extends FixedByteAlignedReader {
+
+ protected int dataTypeLengthInBytes;
+
+ ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ public void writeData() {
+ dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
+ }
+ }
+
+ /**
+ * Reads from bytes, converts, and writes to buffer
+ * @param start the index in bytes to start reading from
+ * @param index the index of the ValueVector
+ */
+ abstract void addNext(int start, int index);
+ }
+
+ public static class DateReader extends ConvertedReader {
+
+ DateVector dateVector;
+
+ DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ dateVector = (DateVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+ NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, start)
+ - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ }
+ }
+
+ public static class Decimal28Reader extends ConvertedReader {
+
+ Decimal28SparseVector decimal28Vector;
+
+ Decimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal28Vector = (Decimal28SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = Decimal28SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
+ }
+ }
+
+ public static class Decimal38Reader extends ConvertedReader {
+
+ Decimal38SparseVector decimal38Vector;
+
+ Decimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal38Vector = (Decimal38SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = Decimal38SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
new file mode 100644
index 0000000..bbff574
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -0,0 +1,213 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public class FixedWidthRepeatedReader extends VarLengthColumn {
+
+ RepeatedFixedWidthVector castedRepeatedVector;
+ ColumnReader dataReader;
+ int dataTypeLengthInBytes;
+ // we can do a vector copy of the data once we figure out how much we need to copy
+ // this tracks the number of values to transfer (the dataReader will translate this to a number
+ // of bytes to transfer and re-use the code from the non-repeated types)
+ int valuesToRead;
+ int repeatedGroupsReadInCurrentPass;
+ int repeatedValuesInCurrentList;
+ // empty lists are notated by definition levels, to stop reading at the correct time, we must keep
+ // track of the number of empty lists as well as the length of all of the defined lists together
+ int definitionLevelsRead;
+ // parquet currently does not restrict lists reaching across pages for repeated values, this necessitates
+ // tracking when this happens to stop some of the state updates until we know the full length of the repeated
+ // value for the current record
+ boolean notFishedReadingList;
+ byte[] leftOverBytes;
+
+ FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
+ castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
+ this.dataTypeLengthInBytes = dataTypeLengthInBytes;
+ this.dataReader = dataReader;
+ this.dataReader.pageReader = this.pageReader;
+ // this is not in the reset method because it needs to be initialized only for the very first page read
+ // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not
+ // clear it at the start of the next read loop
+ notFishedReadingList = false;
+ }
+
+ public void reset() {
+ bytesReadInCurrentPass = 0;
+ valuesReadInCurrentPass = 0;
+ pageReader.valuesReadyToRead = 0;
+ dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getData();
+ dataReader.valuesReadInCurrentPass = 0;
+ repeatedGroupsReadInCurrentPass = 0;
+ }
+
+ public int getRecordsReadInCurrentPass() {
+ return repeatedGroupsReadInCurrentPass;
+ }
+
+ @Override
+ protected void readField(long recordsToRead) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean skipReadyToReadPositionUpdate() {
+ return false;
+ }
+
+ public void updateReadyToReadPosition() {
+ valuesToRead += repeatedValuesInCurrentList;
+ pageReader.valuesReadyToRead += repeatedValuesInCurrentList;
+ repeatedGroupsReadInCurrentPass++;
+ currDictVal = null;
+ if ( ! notFishedReadingList)
+ repeatedValuesInCurrentList = -1;
+ }
+
+ public void updatePosition() {
+ pageReader.readPosInBytes += dataTypeLengthInBits;
+ bytesReadInCurrentPass += dataTypeLengthInBits;
+ valuesReadInCurrentPass++;
+ }
+
+ public void hitRowGroupEnd() {
+ pageReader.valuesReadyToRead = 0;
+ definitionLevelsRead = 0;
+ }
+
+ public void postPageRead() {
+ super.postPageRead();
+ // this is no longer correct as we figured out that lists can reach across pages
+ if ( ! notFishedReadingList)
+ repeatedValuesInCurrentList = -1;
+ definitionLevelsRead = 0;
+ }
+
+ protected int totalValuesReadAndReadyToReadInPage() {
+ // we need to prevent the page reader from getting rid of the current page in the case where we have a repeated
+ // value split across a page boundary
+ if (notFishedReadingList) {
+ return definitionLevelsRead - repeatedValuesInCurrentList;
+ }
+ return definitionLevelsRead;
+ }
+
+ protected boolean checkVectorCapacityReached() {
+ boolean doneReading = super.checkVectorCapacityReached();
+ if (doneReading)
+ return true;
+ if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity())
+ return true;
+ else
+ return false;
+ }
+
+ protected boolean readAndStoreValueSizeInformation() {
+ boolean readingValsAcrossPageBoundary = false;
+ int numLeftoverVals = 0;
+ if (notFishedReadingList) {
+ numLeftoverVals = repeatedValuesInCurrentList;
+ readRecords(numLeftoverVals);
+ readingValsAcrossPageBoundary = true;
+ notFishedReadingList = false;
+ pageReader.valuesReadyToRead = 0;
+ try {
+ boolean stopReading = readPage();
+ if (stopReading) {
+ // hit the end of a row group
+ return false;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected error reading parquet repeated column.", e);
+ }
+ }
+ if ( currDefLevel == -1 ) {
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ definitionLevelsRead++;
+ }
+ int repLevel;
+ if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel){
+ if (repeatedValuesInCurrentList == -1 || notFishedReadingList) {
+ repeatedValuesInCurrentList = 1;
+ do {
+ repLevel = pageReader.repetitionLevels.readInteger();
+ if (repLevel > 0) {
+ repeatedValuesInCurrentList++;
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ definitionLevelsRead++;
+
+ // we hit the end of this page, without confirmation that we reached the end of the current record
+ if (definitionLevelsRead == pageReader.currentPage.getValueCount()) {
+ // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
+ // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
+ // to add it to the read )
+ if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()){
+ notFishedReadingList = true;
+ // if we hit this case, we cut off the current batch at the previous value, these extra values as well
+ // as those that spill into the next page will be added to the next batch
+ return true;
+ }
+ }
+ }
+ } while (repLevel != 0);
+ }
+ }
+ else {
+ repeatedValuesInCurrentList = 0;
+ }
+ int currentValueListLength = repeatedValuesInCurrentList;
+ if (readingValsAcrossPageBoundary) {
+ currentValueListLength += numLeftoverVals;
+ }
+ // this should not fail
+ if (!castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
+ currentValueListLength)) {
+ return true;
+ }
+ // This field is being referenced in the superclass determineSize method, so we need to set it here
+ // again going to make this the length in BYTES to avoid repetitive multiplication/division
+ dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
+ return false;
+ }
+
+ protected void readRecords(int valuesToRead) {
+ if (valuesToRead == 0) return;
+ // TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into
+ // a single vector
+ dataReader.valuesReadInCurrentPass = 0;
+ dataReader.readValues(valuesToRead);
+ valuesReadInCurrentPass += valuesToRead;
+ castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass);
+ }
+
+ @Override
+ public int capacity() {
+ return castedRepeatedVector.getMutator().getDataVector().getData().capacity();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
new file mode 100644
index 0000000..fbf1dee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+/**
+ * This class is used in conjunction with its superclass to read nullable bit columns in a parquet file.
+ * It currently is using an inefficient value-by-value approach.
+ * TODO - make this more efficient by copying runs of values like in NullableFixedByteAlignedReader
+ * This will also involve incorporating the ideas from the BitReader (the reader for non-nullable bits)
+ * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values
+ * in the next batch.
+ */
+final class NullableBitReader extends ColumnReader {
+
+ NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ @Override
+ public void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ int defLevel;
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ defLevel = pageReader.definitionLevels.readInteger();
+ // if the value is defined
+ if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
+ if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
+ pageReader.valueReader.readBoolean() ? 1 : 0 )) {
+ throw new RuntimeException();
+ }
+ }
+ // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
new file mode 100644
index 0000000..2babc20
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{
+
+ int nullsFound;
+ // used to skip nulls found
+ int rightBitShift;
+ // used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte
+ int bitsUsed;
+ BaseValueVector castedBaseVector;
+ NullableVectorDefinitionSetter castedVectorMutator;
+
+ NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedBaseVector = (BaseValueVector) v;
+ castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
+ }
+
+ public void processPages(long recordsToReadInThisPass) throws IOException {
+ readStartInBytes = 0;
+ readLength = 0;
+ readLengthInBits = 0;
+ recordsReadInThisIteration = 0;
+ vectorData = castedBaseVector.getData();
+
+ do {
+ // if no page has been read, or all of the records have been read out of a page, read the next one
+ if (pageReader.currentPage == null
+ || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+ if (!pageReader.next()) {
+ break;
+ }
+ }
+
+ // values need to be spaced out where nulls appear in the column
+ // leaving blank space for nulls allows for random access to values
+ // to optimize copying data out of the buffered disk stream, runs of defined values
+ // are located and copied together, rather than copying individual values
+
+ long runStart = pageReader.readPosInBytes;
+ int runLength;
+ int currentDefinitionLevel;
+ int currentValueIndexInVector = (int) recordsReadInThisIteration;
+ boolean lastValueWasNull;
+ int definitionLevelsRead;
+ // loop to find the longest run of defined values available, can be preceded by several nulls
+ while (true){
+ definitionLevelsRead = 0;
+ lastValueWasNull = true;
+ nullsFound = 0;
+ runLength = 0;
+ if (currentValueIndexInVector == recordsToReadInThisPass
+ || currentValueIndexInVector >= valueVec.getValueCapacity()) {
+ break;
+ }
+ while(currentValueIndexInVector < recordsToReadInThisPass
+ && currentValueIndexInVector < valueVec.getValueCapacity()
+ && pageReader.valuesRead + definitionLevelsRead < pageReader.currentPage.getValueCount()){
+ currentDefinitionLevel = pageReader.definitionLevels.readInteger();
+ definitionLevelsRead++;
+ if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
+ // a run of non-null values was found, break out of this loop to do a read in the outer loop
+ nullsFound++;
+ if ( ! lastValueWasNull ){
+ currentValueIndexInVector++;
+ break;
+ }
+ lastValueWasNull = true;
+ }
+ else{
+ if (lastValueWasNull){
+ runStart = pageReader.readPosInBytes;
+ runLength = 0;
+ lastValueWasNull = false;
+ }
+ runLength++;
+ castedVectorMutator.setIndexDefined(currentValueIndexInVector);
+ }
+ currentValueIndexInVector++;
+ }
+ pageReader.readPosInBytes = runStart;
+ recordsReadInThisIteration = runLength;
+
+ readField( runLength);
+ int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
+ if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
+ castedBaseVector.getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
+ }
+ else if (dataTypeLengthInBits < 8){
+ rightBitShift += dataTypeLengthInBits * nullsFound;
+ }
+ recordsReadInThisIteration += nullsFound;
+ valuesReadInCurrentPass += recordsReadInThisIteration;
+ totalValuesRead += recordsReadInThisIteration;
+ pageReader.valuesRead += recordsReadInThisIteration;
+ if ( (readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0)
+ || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+ if (!pageReader.next()) {
+ break;
+ }
+ } else {
+ pageReader.readPosInBytes = readStartInBytes + readLength;
+ }
+ }
+ } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+ valueVec.getMutator().setValueCount(
+ valuesReadInCurrentPass);
+ }
+
+ protected abstract void readField(long recordsToRead);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
new file mode 100644
index 0000000..c1575de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+
+import org.joda.time.DateTimeUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.math.BigDecimal;
+
+public class NullableFixedByteAlignedReaders {
+
+ static class NullableFixedByteAlignedReader extends NullableColumnReader {
+ protected byte[] bytes;
+
+ NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+ // set up metadata
+ this.readStartInBytes = pageReader.readPosInBytes;
+ this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ this.bytes = pageReader.pageDataByteArray;
+
+ // fill in data.
+ vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
+ }
+ }
+
+ static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+ }
+ }
+ }
+ }
+
+ static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ }
+ }
+ }
+
+ static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ }
+ }
+ }
+
+ static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ }
+ }
+ }
+
+ static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader {
+
+ protected int dataTypeLengthInBytes;
+
+ NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+ // set up metadata
+ this.readStartInBytes = pageReader.readPosInBytes;
+ this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ this.bytes = pageReader.pageDataByteArray;
+
+ dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
+ }
+ }
+
+ abstract void addNext(int start, int index);
+ }
+
+ public static class NullableDateReader extends NullableConvertedReader {
+
+ NullableDateVector dateVector;
+
+ NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ dateVector = (NullableDateVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ }
+
+ // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+ public static int readIntLittleEndian(byte[] in, int offset) {
+ int ch4 = in[offset] & 0xff;
+ int ch3 = in[offset + 1] & 0xff;
+ int ch2 = in[offset + 2] & 0xff;
+ int ch1 = in[offset + 3] & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ }
+
+ public static class NullableDecimal28Reader extends NullableConvertedReader {
+
+ NullableDecimal28SparseVector decimal28Vector;
+
+ NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal28Vector = (NullableDecimal28SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = NullableDecimal28SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
+ }
+ }
+
+ public static class NullableDecimal38Reader extends NullableConvertedReader {
+
+ NullableDecimal38SparseVector decimal38Vector;
+
+ NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal38Vector = (NullableDecimal38SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = NullableDecimal38SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
new file mode 100644
index 0000000..2be9a37
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -0,0 +1,130 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V> {
+
+ int nullsRead;
+ boolean currentValNull = false;
+
+ NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ public abstract boolean setSafe(int index, byte[] value, int start, int length);
+
+ public abstract int capacity();
+
+ public void reset() {
+ bytesReadInCurrentPass = 0;
+ valuesReadInCurrentPass = 0;
+ nullsRead = 0;
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected void postPageRead() {
+ currLengthDeterminingDictVal = null;
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected boolean readAndStoreValueSizeInformation() throws IOException {
+ // we need to read all of the lengths to determine if this value will fit in the current vector,
+ // as we can only read each definition level once, we have to store the last one as we will need it
+ // at the start of the next read if we decide after reading all of the varlength values in this record
+ // that it will not fit in this batch
+ currentValNull = false;
+ if ( currDefLevel == -1 ) {
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ }
+ if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel){
+ nullsRead++;
+ // set length of zero, each index in the vector defaults to null so no need to set the nullability
+ variableWidthVector.getMutator().setValueLengthSafe(
+ valuesReadInCurrentPass + pageReader.valuesReadyToRead, 0);
+ currentValNull = true;
+ return false;// field is null, no length to add to data vector
+ }
+
+ if (usingDictionary) {
+ if (currLengthDeterminingDictVal == null) {
+ currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes();
+ }
+ currDictValToWrite = currLengthDeterminingDictVal;
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ dataTypeLengthInBits = currLengthDeterminingDictVal.length();
+ }
+ else {
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+ (int) pageReader.readyToReadPosInBytes);
+ }
+ // I think this also needs to happen if it is null for the random access
+ if (! variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) {
+ return true;
+ }
+ boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray,
+ (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
+ assert success;
+ return false;
+ }
+
+ public void updateReadyToReadPosition() {
+ if (! currentValNull){
+ pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
+ }
+ pageReader.valuesReadyToRead++;
+ currLengthDeterminingDictVal = null;
+ }
+
+ public void updatePosition() {
+ if (! currentValNull){
+ pageReader.readPosInBytes += dataTypeLengthInBits + 4;
+ bytesReadInCurrentPass += dataTypeLengthInBits;
+ }
+ currentValNull = false;
+ valuesReadInCurrentPass++;
+ }
+
+ @Override
+ protected void readField(long recordsToRead) {
+ if (usingDictionary) {
+ currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ }
+ dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
+ currentValNull = variableWidthVector.getAccessor().getObject(valuesReadInCurrentPass) == null;
+ // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+ if (! currentValNull){
+ boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray,
+ (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
+ assert success;
+ }
+ updatePosition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
new file mode 100644
index 0000000..1d300bb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.bytes.BytesInput;
+import parquet.column.Dictionary;
+import parquet.column.ValuesType;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.values.ValuesReader;
+import parquet.column.values.dictionary.DictionaryValuesReader;
+import parquet.format.PageHeader;
+import parquet.format.PageType;
+import parquet.format.Util;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+// class to keep track of the read position of variable length columns
+final class PageReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
+
+ private final ColumnReader parentColumnReader;
+ private final ColumnDataReader dataReader;
+ // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
+ Page currentPage;
+ // buffer to store bytes of current page
+ byte[] pageDataByteArray;
+
+ // for variable length data we need to keep track of our current position in the page data
+ // as the values and lengths are intermixed, making random access to the length data impossible
+ long readyToReadPosInBytes;
+ // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
+ long readPosInBytes;
+ // bit shift needed for the next page if the last one did not line up with a byte boundary
+ int bitShift;
+ // storage space for extra bits at the end of a page if they did not line up with a byte boundary
+ // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
+ //byte extraBits;
+
+ // used for columns where the number of values that will fit in a vector is unknown
+ // currently used for variable length
+ // TODO - reuse this when compressed vectors are added, where fixed length values will take up a
+ // variable amount of space
+ // For example: if nulls are stored without extra space left in the data vector
+ // (this is currently simplifying random access to the data during processing, but increases the size of the vectors)
+ int valuesReadyToRead;
+
+ // the number of values read out of the last page
+ int valuesRead;
+ int byteLength;
+ //int rowGroupIndex;
+ ValuesReader definitionLevels;
+ ValuesReader repetitionLevels;
+ ValuesReader valueReader;
+ ValuesReader dictionaryLengthDeterminingReader;
+ ValuesReader dictionaryValueReader;
+ Dictionary dictionary;
+ PageHeader pageHeader = null;
+
+ PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
+ this.parentColumnReader = parentStatus;
+
+ long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
+ long start = columnChunkMetaData.getFirstDataPageOffset();
+ try {
+ FSDataInputStream f = fs.open(path);
+ this.dataReader = new ColumnDataReader(f, start, totalByteLength);
+ if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+ f.seek(columnChunkMetaData.getDictionaryPageOffset());
+ PageHeader pageHeader = Util.readPageHeader(f);
+ assert pageHeader.type == PageType.DICTIONARY_PAGE;
+ BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+ .decompress( //
+ dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+ pageHeader.getUncompressed_page_size(), //
+ parentColumnReader.columnChunkMetaData.getCodec());
+ DictionaryPage page = new DictionaryPage(
+ bytesIn,
+ pageHeader.uncompressed_page_size,
+ pageHeader.dictionary_page_header.num_values,
+ parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+ );
+ this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+ }
+ } catch (IOException e) {
+ throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e);
+ }
+
+ }
+
+
+ /**
+ * Grab the next page.
+ *
+ * @return - if another page was present
+ * @throws java.io.IOException
+ */
+ public boolean next() throws IOException {
+
+ currentPage = null;
+ valuesRead = 0;
+ valuesReadyToRead = 0;
+
+ // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
+ // and submit a bug report
+ if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
+ return false;
+ }
+
+ // next, we need to decompress the bytes
+ // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+ // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+ do {
+ pageHeader = dataReader.readPageHeader();
+ if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+ BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+ .decompress( //
+ dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+ pageHeader.getUncompressed_page_size(), //
+ parentColumnReader.columnChunkMetaData.getCodec());
+ DictionaryPage page = new DictionaryPage(
+ bytesIn,
+ pageHeader.uncompressed_page_size,
+ pageHeader.dictionary_page_header.num_values,
+ parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+ );
+ this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
+ }
+ } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+
+ BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+ .decompress( //
+ dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+ pageHeader.getUncompressed_page_size(), //
+ parentColumnReader.columnChunkMetaData.getCodec());
+ currentPage = new Page(
+ bytesIn,
+ pageHeader.data_page_header.num_values,
+ pageHeader.uncompressed_page_size,
+ ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+ ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+ ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+ );
+
+ byteLength = pageHeader.uncompressed_page_size;
+
+ if (currentPage == null) {
+ return false;
+ }
+
+ pageDataByteArray = currentPage.getBytes().toByteArray();
+
+ readPosInBytes = 0;
+ if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+ repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+ repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
+ // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
+ // read the first zero here to simplify the reading processes, and start reading the first value the same as all
+ // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
+ // the first list of repetition levels
+ readPosInBytes = repetitionLevels.getNextOffset();
+ repetitionLevels.readInteger();
+ }
+ if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+ parentColumnReader.currDefLevel = -1;
+ if (!currentPage.getValueEncoding().usesDictionary()) {
+ parentColumnReader.usingDictionary = false;
+ definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ readPosInBytes = definitionLevels.getNextOffset();
+ if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+ valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+ valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ }
+ } else {
+ parentColumnReader.usingDictionary = true;
+ definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ readPosInBytes = definitionLevels.getNextOffset();
+ // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
+ // actually copying the values out into the vectors
+ dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
+ dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ dictionaryValueReader = new DictionaryValuesReader(dictionary);
+ dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ this.parentColumnReader.usingDictionary = true;
+ }
+ }
+ // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector
+ // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will
+ // fit one record at a time, such as for variable length data. Both operations must start in the same location after the
+ // definition and repetition level data which is stored alongside the page data itself
+ readyToReadPosInBytes = readPosInBytes;
+ return true;
+ }
+
+ public void clear(){
+ this.dataReader.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
new file mode 100644
index 0000000..ad849b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ParquetFixedWidthDictionaryReader extends ColumnReader{
+
+ ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ @Override
+ public void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ int defLevel;
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ defLevel = pageReader.definitionLevels.readInteger();
+ // if the value is defined
+ if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
+ if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
+ ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
+ pageReader.valueReader.readLong() );
+ }
+ // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
new file mode 100644
index 0000000..2228787
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.PrimitiveType;
+
+public class ParquetRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+
+ // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
+ private static final int NUMBER_OF_VECTORS = 1;
+ private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
+ private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
+ private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+
+ // TODO - should probably find a smarter way to set this, currently 1 megabyte
+ private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
+ public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
+ private static final String SEPERATOR = System.getProperty("file.separator");
+
+ // used for clearing the last n bits of a byte
+ public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
+ // used for clearing the first n bits of a byte
+ public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
+
+ private int bitWidthAllFixedFields;
+ private boolean allFieldsFixedLength;
+ private int recordsPerBatch;
+ private long totalRecords;
+ private long rowGroupOffset;
+
+ private List<ColumnReader> columnStatuses;
+ private FileSystem fileSystem;
+ private long batchSize;
+ Path hadoopPath;
+ private VarLenBinaryReader varLengthReader;
+ private ParquetMetadata footer;
+ private List<SchemaPath> columns;
+ private final CodecFactoryExposer codecFactoryExposer;
+ int rowGroupIndex;
+
+ public ParquetRecordReader(FragmentContext fragmentContext, //
+ String path, //
+ int rowGroupIndex, //
+ FileSystem fs, //
+ CodecFactoryExposer codecFactoryExposer, //
+ ParquetMetadata footer, //
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+ columns);
+ }
+
+ public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
+ String path, int rowGroupIndex, FileSystem fs,
+ CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ hadoopPath = new Path(path);
+ fileSystem = fs;
+ this.codecFactoryExposer = codecFactoryExposer;
+ this.rowGroupIndex = rowGroupIndex;
+ this.batchSize = batchSize;
+ this.footer = footer;
+ this.columns = columns;
+ }
+
+ public CodecFactoryExposer getCodecFactoryExposer() {
+ return codecFactoryExposer;
+ }
+
+ public Path getHadoopPath() {
+ return hadoopPath;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public int getRowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public int getBitWidthAllFixedFields() {
+ return bitWidthAllFixedFields;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * @param type a fixed length type from the parquet library enum
+ * @return the length in pageDataByteArray of the type
+ */
+ public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
+ switch (type) {
+ case INT64: return 64;
+ case INT32: return 32;
+ case BOOLEAN: return 1;
+ case FLOAT: return 32;
+ case DOUBLE: return 64;
+ case INT96: return 96;
+ // binary and fixed length byte array
+ default:
+ throw new IllegalStateException("Length cannot be determined for type " + type);
+ }
+ }
+
+ private boolean fieldSelected(MaterializedField field){
+ // TODO - not sure if this is how we want to represent this
+ // for now it makes the existing tests pass, simply selecting
+ // all available data if no columns are provided
+ if (this.columns != null){
+ for (SchemaPath expr : this.columns){
+ if ( field.matches(expr)){
+ return true;
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+
+ columnStatuses = new ArrayList<>();
+ totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+ List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+ allFieldsFixedLength = true;
+ ColumnDescriptor column;
+ ColumnChunkMetaData columnChunkMetaData;
+ int columnsToScan = 0;
+
+ MaterializedField field;
+ ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+ FileMetaData fileMetaData;
+
+ // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+ // store a map from column name to converted types if they are non-null
+ HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+ fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+ for (SchemaElement se : fileMetaData.getSchema()) {
+ schemaElements.put(se.getName(), se);
+ }
+
+ // loop to add up the length of the fixed width columns and build the schema
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+ logger.debug("name: " + fileMetaData.getSchema().get(i).name);
+ SchemaElement se = schemaElements.get(column.getPath()[0]);
+ MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
+ field = MaterializedField.create(toFieldName(column.getPath()),mt);
+ if ( ! fieldSelected(field)){
+ continue;
+ }
+ columnsToScan++;
+ // sum the lengths of all of the fixed length fields
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ if (column.getMaxRepetitionLevel() > 0) {
+ allFieldsFixedLength = false;
+ }
+ // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
+ // TODO - implement this when the feature is added upstream
+ if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+ bitWidthAllFixedFields += se.getType_length() * 8;
+ } else {
+ bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+ }
+ } else {
+ allFieldsFixedLength = false;
+ }
+ }
+ rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+ // none of the columns in the parquet file matched the request columns from the query
+ if (columnsToScan == 0){
+ throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
+ }
+ if (allFieldsFixedLength) {
+ recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+ footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+ }
+ else {
+ recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+ }
+
+ try {
+ ValueVector v;
+ ConvertedType convertedType;
+ SchemaElement schemaElement;
+ ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+ // initialize all of the column read status objects
+ boolean fieldFixedLength = false;
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+ columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
+ schemaElement = schemaElements.get(column.getPath()[0]);
+ convertedType = schemaElement.getConverted_type();
+ MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
+ field = MaterializedField.create(toFieldName(column.getPath()), type);
+ // the field was not requested to be read
+ if ( ! fieldSelected(field)) continue;
+
+ fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+ v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ if (column.getMaxRepetitionLevel() > 0) {
+ ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch,
+ ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
+ varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
+ getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
+ }
+ else {
+ columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+ schemaElement));
+ }
+ } else {
+ // create a reader and add it to the appropriate list
+ varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement));
+ }
+ }
+ varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ } catch (Exception e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private SchemaPath toFieldName(String[] paths) {
+ return SchemaPath.getCompoundPath(paths);
+ }
+
+ private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+ if (column.getMaxRepetitionLevel() > 0 ) {
+ return DataMode.REPEATED;
+ } else if (column.getMaxDefinitionLevel() == 0) {
+ return TypeProtos.DataMode.REQUIRED;
+ } else {
+ return TypeProtos.DataMode.OPTIONAL;
+ }
+ }
+
+ private void resetBatch() {
+ for (ColumnReader column : columnStatuses) {
+ column.valuesReadInCurrentPass = 0;
+ }
+ for (VarLengthColumn r : varLengthReader.columns){
+ r.valuesReadInCurrentPass = 0;
+ }
+ }
+
+ public void readAllFixedFields(long recordsToRead) throws IOException {
+
+ for (ColumnReader crs : columnStatuses){
+ crs.processPages(recordsToRead);
+ }
+ }
+
+ @Override
+ public int next() {
+ resetBatch();
+ long recordsToRead = 0;
+ try {
+ ColumnReader firstColumnStatus;
+ if (columnStatuses.size() > 0){
+ firstColumnStatus = columnStatuses.iterator().next();
+ }
+ else{
+ if (varLengthReader.columns.size() > 0){
+ firstColumnStatus = varLengthReader.columns.iterator().next();
+ }
+ else{
+ firstColumnStatus = null;
+ }
+ }
+ // TODO - replace this with new functionality of returning batches even if no columns are selected
+ // the query 'select 5 from parquetfile' should return the number of records that the parquet file contains
+ // we don't need to read any of the data, we just need to fill batches with a record count and a useless vector with
+ // the right number of values
+ if (firstColumnStatus == null) throw new DrillRuntimeException("Unexpected error reading parquet file, not reading any columns");
+
+ if (allFieldsFixedLength) {
+ recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+ } else {
+ recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+
+ }
+
+ if (allFieldsFixedLength) {
+ readAllFixedFields(recordsToRead);
+ } else { // variable length columns
+ long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+ readAllFixedFields(fixedRecordsToRead);
+ }
+
+ return firstColumnStatus.getRecordsReadInCurrentPass();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ for (ColumnReader column : columnStatuses) {
+ column.clear();
+ }
+ columnStatuses.clear();
+
+ for (VarLengthColumn r : varLengthReader.columns){
+ r.clear();
+ }
+ varLengthReader.columns.clear();
+ }
+}
|