drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [12/17] DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.
Date Tue, 29 Jul 2014 15:38:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
new file mode 100644
index 0000000..4513aaa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.DateVector;
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTimeUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.math.BigDecimal;
+
+class FixedByteAlignedReader extends ColumnReader {
+
+  protected byte[] bytes;
+
+  
+  FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                         boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+  }
+
+  // this method is called by its superclass during a read loop
+  @Override
+  protected void readField(long recordsToReadInThisPass) {
+
+    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+    readStartInBytes = pageReader.readPosInBytes;
+    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+    readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+    bytes = pageReader.pageDataByteArray;
+    // vectorData is assigned by the superclass read loop method
+    writeData();
+  }
+
+  protected void writeData() {
+    vectorData.writeBytes(bytes,
+        (int) readStartInBytes, (int) readLength);
+  }
+
+  public static abstract class ConvertedReader extends FixedByteAlignedReader {
+
+    protected int dataTypeLengthInBytes;
+
+    ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                           boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    public void writeData() {
+      dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
+      for (int i = 0; i < recordsReadInThisIteration; i++) {
+        addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
+      }
+    }
+
+    /**
+     * Reads from bytes, converts, and writes to buffer
+     * @param start the index in bytes to start reading from
+     * @param index the index of the ValueVector
+     */
+    abstract void addNext(int start, int index);
+  }
+
+  public static class DateReader extends ConvertedReader {
+
+    DateVector dateVector;
+
+    DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      dateVector = (DateVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+          NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, start)
+              - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+    }
+  }
+
+  public static class Decimal28Reader extends ConvertedReader {
+
+    Decimal28SparseVector decimal28Vector;
+
+    Decimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      decimal28Vector = (Decimal28SparseVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int width = Decimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
+    }
+  }
+
+  public static class Decimal38Reader extends ConvertedReader {
+
+    Decimal38SparseVector decimal38Vector;
+
+    Decimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      decimal38Vector = (Decimal38SparseVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int width = Decimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
new file mode 100644
index 0000000..bbff574
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -0,0 +1,213 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public class FixedWidthRepeatedReader extends VarLengthColumn {
+
+  RepeatedFixedWidthVector castedRepeatedVector;
+  ColumnReader dataReader;
+  int dataTypeLengthInBytes;
+  // we can do a vector copy of the data once we figure out how much we need to copy
+  // this tracks the number of values to transfer (the dataReader will translate this to a number
+  // of bytes to transfer and re-use the code from the non-repeated types)
+  int valuesToRead;
+  int repeatedGroupsReadInCurrentPass;
+  int repeatedValuesInCurrentList;
+  // empty lists are notated by definition levels, to stop reading at the correct time, we must keep
+  // track of the number of empty lists as well as the length of all of the defined lists together
+  int definitionLevelsRead;
+  // parquet currently does not restrict lists reaching across pages for repeated values, this necessitates
+  // tracking when this happens to stop some of the state updates until we know the full length of the repeated
+  // value for the current record
+  boolean notFishedReadingList;
+  byte[] leftOverBytes;
+
+  FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
+    castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
+    this.dataTypeLengthInBytes = dataTypeLengthInBytes;
+    this.dataReader = dataReader;
+    this.dataReader.pageReader = this.pageReader;
+    // this is not in the reset method because it needs to be initialized only for the very first page read
+    // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not
+    // clear it at the start of the next read loop
+    notFishedReadingList = false;
+  }
+
+  public void reset() {
+    bytesReadInCurrentPass = 0;
+    valuesReadInCurrentPass = 0;
+    pageReader.valuesReadyToRead = 0;
+    dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getData();
+    dataReader.valuesReadInCurrentPass = 0;
+    repeatedGroupsReadInCurrentPass = 0;
+  }
+
+  public int getRecordsReadInCurrentPass() {
+    return repeatedGroupsReadInCurrentPass;
+  }
+
+  @Override
+  protected void readField(long recordsToRead) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  public boolean skipReadyToReadPositionUpdate() {
+    return false;
+  }
+
+  public void updateReadyToReadPosition() {
+    valuesToRead += repeatedValuesInCurrentList;
+    pageReader.valuesReadyToRead += repeatedValuesInCurrentList;
+    repeatedGroupsReadInCurrentPass++;
+    currDictVal = null;
+    if ( ! notFishedReadingList)
+      repeatedValuesInCurrentList = -1;
+  }
+
+  public void updatePosition() {
+    pageReader.readPosInBytes += dataTypeLengthInBits;
+    bytesReadInCurrentPass += dataTypeLengthInBits;
+    valuesReadInCurrentPass++;
+  }
+
+  public void hitRowGroupEnd() {
+    pageReader.valuesReadyToRead = 0;
+    definitionLevelsRead = 0;
+  }
+
+  public void postPageRead() {
+    super.postPageRead();
+    // this is no longer correct as we figured out that lists can reach across pages
+    if ( ! notFishedReadingList)
+      repeatedValuesInCurrentList = -1;
+    definitionLevelsRead = 0;
+  }
+
+  protected int totalValuesReadAndReadyToReadInPage() {
+    // we need to prevent the page reader from getting rid of the current page in the case where we have a repeated
+    // value split across a page boundary
+    if (notFishedReadingList) {
+      return definitionLevelsRead - repeatedValuesInCurrentList;
+    }
+    return definitionLevelsRead;
+  }
+
+  protected boolean checkVectorCapacityReached() {
+    boolean doneReading = super.checkVectorCapacityReached();
+    if (doneReading)
+      return true;
+    if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity())
+      return true;
+    else
+      return false;
+  }
+
+  protected boolean readAndStoreValueSizeInformation() {
+    boolean readingValsAcrossPageBoundary = false;
+    int numLeftoverVals = 0;
+    if (notFishedReadingList) {
+      numLeftoverVals = repeatedValuesInCurrentList;
+      readRecords(numLeftoverVals);
+      readingValsAcrossPageBoundary = true;
+      notFishedReadingList = false;
+      pageReader.valuesReadyToRead = 0;
+      try {
+        boolean stopReading = readPage();
+        if (stopReading) {
+          // hit the end of a row group
+          return false;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Unexpected error reading parquet repeated column.", e);
+      }
+    }
+    if ( currDefLevel == -1 ) {
+      currDefLevel = pageReader.definitionLevels.readInteger();
+      definitionLevelsRead++;
+    }
+    int repLevel;
+    if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel){
+      if (repeatedValuesInCurrentList == -1 || notFishedReadingList) {
+        repeatedValuesInCurrentList = 1;
+        do {
+          repLevel = pageReader.repetitionLevels.readInteger();
+          if (repLevel > 0) {
+            repeatedValuesInCurrentList++;
+            currDefLevel = pageReader.definitionLevels.readInteger();
+            definitionLevelsRead++;
+
+            // we hit the end of this page, without confirmation that we reached the end of the current record
+            if (definitionLevelsRead == pageReader.currentPage.getValueCount()) {
+              // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
+              // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
+              // to add it to the read )
+              if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()){
+                notFishedReadingList = true;
+                // if we hit this case, we cut off the current batch at the previous value, these extra values as well
+                // as those that spill into the next page will be added to the next batch
+                return true;
+              }
+            }
+          }
+        } while (repLevel != 0);
+      }
+    }
+    else {
+      repeatedValuesInCurrentList = 0;
+    }
+    int currentValueListLength = repeatedValuesInCurrentList;
+    if (readingValsAcrossPageBoundary) {
+      currentValueListLength += numLeftoverVals;
+    }
+    // this should not fail
+    if (!castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
+        currentValueListLength)) {
+      return true;
+    }
+    // This field is being referenced in the superclass determineSize method, so we need to set it here
+    // again going to make this the length in BYTES to avoid repetitive multiplication/division
+    dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
+    return false;
+  }
+
+  protected void readRecords(int valuesToRead) {
+    if (valuesToRead == 0) return;
+    // TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into
+    // a single vector
+    dataReader.valuesReadInCurrentPass = 0;
+    dataReader.readValues(valuesToRead);
+    valuesReadInCurrentPass += valuesToRead;
+    castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass);
+  }
+
+  @Override
+  public int capacity() {
+    return castedRepeatedVector.getMutator().getDataVector().getData().capacity();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
new file mode 100644
index 0000000..fbf1dee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+/**
+ * This class is used in conjunction with its superclass to read nullable bit columns in a parquet file.
+ * It currently is using an inefficient value-by-value approach.
+ * TODO - make this more efficient by copying runs of values like in NullableFixedByteAlignedReader
+ * This will also involve incorporating the ideas from the BitReader (the reader for non-nullable bits)
+ * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values
+ * in the next batch.
+ */
+final class NullableBitReader extends ColumnReader {
+
+  NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+  }
+
+  @Override
+  public void readField(long recordsToReadInThisPass) {
+
+    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+    int defLevel;
+    for (int i = 0; i < recordsReadInThisIteration; i++){
+      defLevel = pageReader.definitionLevels.readInteger();
+      // if the value is defined
+      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
+        if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
+            pageReader.valueReader.readBoolean() ? 1 : 0 )) {
+          throw new RuntimeException();
+        }
+      }
+      // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
new file mode 100644
index 0000000..2babc20
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{
+
+  int nullsFound;
+  // used to skip nulls found
+  int rightBitShift;
+  // used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte
+  int bitsUsed;
+  BaseValueVector castedBaseVector;
+  NullableVectorDefinitionSetter castedVectorMutator;
+
+  NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+               boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    castedBaseVector = (BaseValueVector) v;
+    castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
+  }
+
+  public void processPages(long recordsToReadInThisPass) throws IOException {
+    readStartInBytes = 0;
+    readLength = 0;
+    readLengthInBits = 0;
+    recordsReadInThisIteration = 0;
+    vectorData = castedBaseVector.getData();
+
+    do {
+      // if no page has been read, or all of the records have been read out of a page, read the next one
+      if (pageReader.currentPage == null
+          || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+        if (!pageReader.next()) {
+          break;
+        }
+      }
+
+      // values need to be spaced out where nulls appear in the column
+      // leaving blank space for nulls allows for random access to values
+      // to optimize copying data out of the buffered disk stream, runs of defined values
+      // are located and copied together, rather than copying individual values
+
+      long runStart = pageReader.readPosInBytes;
+      int runLength;
+      int currentDefinitionLevel;
+      int currentValueIndexInVector = (int) recordsReadInThisIteration;
+      boolean lastValueWasNull;
+      int definitionLevelsRead;
+      // loop to find the longest run of defined values available, can be preceded by several nulls
+      while (true){
+        definitionLevelsRead = 0;
+        lastValueWasNull = true;
+        nullsFound = 0;
+        runLength = 0;
+        if (currentValueIndexInVector == recordsToReadInThisPass
+            || currentValueIndexInVector >= valueVec.getValueCapacity()) {
+          break;
+        }
+        while(currentValueIndexInVector < recordsToReadInThisPass
+            && currentValueIndexInVector < valueVec.getValueCapacity()
+            && pageReader.valuesRead + definitionLevelsRead < pageReader.currentPage.getValueCount()){
+          currentDefinitionLevel = pageReader.definitionLevels.readInteger();
+          definitionLevelsRead++;
+          if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
+            // a run of non-null values was found, break out of this loop to do a read in the outer loop
+            nullsFound++;
+            if ( ! lastValueWasNull ){
+              currentValueIndexInVector++;
+              break;
+            }
+            lastValueWasNull = true;
+          }
+          else{
+            if (lastValueWasNull){
+              runStart = pageReader.readPosInBytes;
+              runLength = 0;
+              lastValueWasNull = false;
+            }
+            runLength++;
+            castedVectorMutator.setIndexDefined(currentValueIndexInVector);
+          }
+          currentValueIndexInVector++;
+        }
+        pageReader.readPosInBytes = runStart;
+        recordsReadInThisIteration = runLength;
+
+        readField( runLength);
+        int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
+        if ( dataTypeLengthInBits > 8  || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
+          castedBaseVector.getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
+        }
+        else if (dataTypeLengthInBits < 8){
+          rightBitShift += dataTypeLengthInBits * nullsFound;
+        }
+        recordsReadInThisIteration += nullsFound;
+        valuesReadInCurrentPass += recordsReadInThisIteration;
+        totalValuesRead += recordsReadInThisIteration;
+        pageReader.valuesRead += recordsReadInThisIteration;
+        if ( (readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0)
+            || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+          if (!pageReader.next()) {
+            break;
+          }
+        } else {
+          pageReader.readPosInBytes = readStartInBytes + readLength;
+        }
+      }
+    } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+    valueVec.getMutator().setValueCount(
+        valuesReadInCurrentPass);
+  }
+
+  protected abstract void readField(long recordsToRead);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
new file mode 100644
index 0000000..c1575de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+
+import org.joda.time.DateTimeUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.math.BigDecimal;
+
+public class NullableFixedByteAlignedReaders {
+
+  static class NullableFixedByteAlignedReader extends NullableColumnReader {
+    protected byte[] bytes;
+
+    NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+      // set up metadata
+      this.readStartInBytes = pageReader.readPosInBytes;
+      this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+      this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+      this.bytes = pageReader.pageDataByteArray;
+
+      // fill in data.
+      vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
+    }
+  }
+
+  static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
+                                SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+      }
+    }
+  }
+
+  static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
+                                   SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+      }
+    }
+  }
+
+  static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
+                                   SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+      }
+    }
+  }
+
+  static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
+
+    private byte[] bytes;
+
+    NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
+                                  SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      for (int i = 0; i < recordsToReadInThisPass; i++){
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+      }
+    }
+  }
+
+  static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader {
+
+    protected int dataTypeLengthInBytes;
+
+    NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+      // set up metadata
+      this.readStartInBytes = pageReader.readPosInBytes;
+      this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+      this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+      this.bytes = pageReader.pageDataByteArray;
+
+      dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
+      for (int i = 0; i < recordsReadInThisIteration; i++) {
+        addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
+      }
+    }
+
+    abstract void addNext(int start, int index);
+  }
+
+  public static class NullableDateReader extends NullableConvertedReader {
+
+    NullableDateVector dateVector;
+
+    NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                       boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      dateVector = (NullableDateVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+    }
+
+    // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+    public static int readIntLittleEndian(byte[] in, int offset) {
+      int ch4 = in[offset] & 0xff;
+      int ch3 = in[offset + 1] & 0xff;
+      int ch2 = in[offset + 2] & 0xff;
+      int ch1 = in[offset + 3] & 0xff;
+      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+    }
+
+  }
+
+  public static class NullableDecimal28Reader extends NullableConvertedReader {
+
+    NullableDecimal28SparseVector decimal28Vector;
+
+    NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      decimal28Vector = (NullableDecimal28SparseVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int width = NullableDecimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+          schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
+    }
+  }
+
+  public static class NullableDecimal38Reader extends NullableConvertedReader {
+
+    NullableDecimal38SparseVector decimal38Vector;
+
+    NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      decimal38Vector = (NullableDecimal38SparseVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int width = NullableDecimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
+          schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
new file mode 100644
index 0000000..2be9a37
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -0,0 +1,130 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V> {
+
+  int nullsRead;
+  boolean currentValNull = false;
+
+  NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+                                SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+  }
+
+  public abstract boolean setSafe(int index, byte[] value, int start, int length);
+
+  public abstract int capacity();
+
+  public void reset() {
+    bytesReadInCurrentPass = 0;
+    valuesReadInCurrentPass = 0;
+    nullsRead = 0;
+    pageReader.valuesReadyToRead = 0;
+  }
+
+  protected void postPageRead() {
+    currLengthDeterminingDictVal = null;
+    pageReader.valuesReadyToRead = 0;
+  }
+
+  protected boolean readAndStoreValueSizeInformation() throws IOException {
+    // we need to read all of the lengths to determine if this value will fit in the current vector,
+    // as we can only read each definition level once, we have to store the last one as we will need it
+    // at the start of the next read if we decide after reading all of the varlength values in this record
+    // that it will not fit in this batch
+    currentValNull = false;
+    if ( currDefLevel == -1 ) {
+      currDefLevel = pageReader.definitionLevels.readInteger();
+    }
+    if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel){
+      nullsRead++;
+      // set length of zero, each index in the vector defaults to null so no need to set the nullability
+      variableWidthVector.getMutator().setValueLengthSafe(
+          valuesReadInCurrentPass + pageReader.valuesReadyToRead, 0);
+      currentValNull = true;
+      return false;// field is null, no length to add to data vector
+    }
+
+    if (usingDictionary) {
+      if (currLengthDeterminingDictVal == null) {
+        currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes();
+      }
+      currDictValToWrite = currLengthDeterminingDictVal;
+      // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
+      dataTypeLengthInBits = currLengthDeterminingDictVal.length();
+    }
+    else {
+      // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
+      dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+          (int) pageReader.readyToReadPosInBytes);
+    }
+    // I think this also needs to happen if it is null for the random access
+    if (! variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) {
+      return true;
+    }
+    boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray,
+        (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
+    assert success;
+    return false;
+  }
+
+  public void updateReadyToReadPosition() {
+    if (! currentValNull){
+      pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
+    }
+    pageReader.valuesReadyToRead++;
+    currLengthDeterminingDictVal = null;
+  }
+
+  public void updatePosition() {
+    if (! currentValNull){
+      pageReader.readPosInBytes += dataTypeLengthInBits + 4;
+      bytesReadInCurrentPass += dataTypeLengthInBits;
+    }
+    currentValNull = false;
+    valuesReadInCurrentPass++;
+  }
+
+  @Override
+  protected void readField(long recordsToRead) {
+    if (usingDictionary) {
+      currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
+      // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
+    }
+    dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
+    currentValNull = variableWidthVector.getAccessor().getObject(valuesReadInCurrentPass) == null;
+    // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+    if (! currentValNull){
+      boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray,
+          (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
+      assert success;
+    }
+    updatePosition();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
new file mode 100644
index 0000000..1d300bb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.bytes.BytesInput;
+import parquet.column.Dictionary;
+import parquet.column.ValuesType;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.values.ValuesReader;
+import parquet.column.values.dictionary.DictionaryValuesReader;
+import parquet.format.PageHeader;
+import parquet.format.PageType;
+import parquet.format.Util;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+// class to keep track of the read position of variable length columns
+final class PageReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
+
+  private final ColumnReader parentColumnReader;
+  private final ColumnDataReader dataReader;
+  // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
+  Page currentPage;
+  // buffer to store bytes of current page
+  byte[] pageDataByteArray;
+
+  // for variable length data we need to keep track of our current position in the page data
+  // as the values and lengths are intermixed, making random access to the length data impossible
+  long readyToReadPosInBytes;
+  // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
+  long readPosInBytes;
+  // bit shift needed for the next page if the last one did not line up with a byte boundary
+  int bitShift;
+  // storage space for extra bits at the end of a page if they did not line up with a byte boundary
+  // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
+  //byte extraBits;
+
+  // used for columns where the number of values that will fit in a vector is unknown
+  // currently used for variable length
+  // TODO - reuse this when compressed vectors are added, where fixed length values will take up a
+  // variable amount of space
+  // For example: if nulls are stored without extra space left in the data vector
+  // (this is currently simplifying random access to the data during processing, but increases the size of the vectors)
+  int valuesReadyToRead;
+
+  // the number of values read out of the last page
+  int valuesRead;
+  int byteLength;
+  //int rowGroupIndex;
+  ValuesReader definitionLevels;
+  ValuesReader repetitionLevels;
+  ValuesReader valueReader;
+  ValuesReader dictionaryLengthDeterminingReader;
+  ValuesReader dictionaryValueReader;
+  Dictionary dictionary;
+  PageHeader pageHeader = null;
+
+  PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
+    this.parentColumnReader = parentStatus;
+
+    long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
+    long start = columnChunkMetaData.getFirstDataPageOffset();
+    try {
+      FSDataInputStream f = fs.open(path);
+      this.dataReader = new ColumnDataReader(f, start, totalByteLength);
+      if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+        f.seek(columnChunkMetaData.getDictionaryPageOffset());
+        PageHeader pageHeader = Util.readPageHeader(f);
+        assert pageHeader.type == PageType.DICTIONARY_PAGE;
+        BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress( //
+                dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+                pageHeader.getUncompressed_page_size(), //
+                parentColumnReader.columnChunkMetaData.getCodec());
+        DictionaryPage page = new DictionaryPage(
+            bytesIn,
+            pageHeader.uncompressed_page_size,
+            pageHeader.dictionary_page_header.num_values,
+            parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+        );
+        this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+      }
+    } catch (IOException e) {
+      throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e);
+    }
+    
+  }
+
+  
+  /**
+   * Grab the next page.
+   *
+   * @return - if another page was present
+   * @throws java.io.IOException
+   */
+  public boolean next() throws IOException {
+
+    currentPage = null;
+    valuesRead = 0;
+    valuesReadyToRead = 0;
+
+    // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
+    // and submit a bug report
+    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
+      return false;
+    }
+
+    // next, we need to decompress the bytes
+    // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+    // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+    do {
+      pageHeader = dataReader.readPageHeader();
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+        BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress( //
+                dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
+                pageHeader.getUncompressed_page_size(), //
+                parentColumnReader.columnChunkMetaData.getCodec());
+        DictionaryPage page = new DictionaryPage(
+            bytesIn,
+            pageHeader.uncompressed_page_size,
+            pageHeader.dictionary_page_header.num_values,
+            parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+        );
+        this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
+      }
+    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+
+    BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+        .decompress( //
+            dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // 
+            pageHeader.getUncompressed_page_size(), //
+            parentColumnReader.columnChunkMetaData.getCodec());
+    currentPage = new Page(
+        bytesIn,
+        pageHeader.data_page_header.num_values,
+        pageHeader.uncompressed_page_size,
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+    );
+
+    byteLength = pageHeader.uncompressed_page_size;
+
+    if (currentPage == null) {
+      return false;
+    }
+
+    pageDataByteArray = currentPage.getBytes().toByteArray();
+
+    readPosInBytes = 0;
+    if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+      repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+      repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+      // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
+      // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
+      // read the first zero here to simplify the reading processes, and start reading the first value the same as all
+      // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
+      // the first list of repetition levels
+      readPosInBytes = repetitionLevels.getNextOffset();
+      repetitionLevels.readInteger();
+    }
+    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+      parentColumnReader.currDefLevel = -1;
+      if (!currentPage.getValueEncoding().usesDictionary()) {
+        parentColumnReader.usingDictionary = false;
+        definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+        definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+        readPosInBytes = definitionLevels.getNextOffset();
+        if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+          valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+          valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+        }
+      } else {
+        parentColumnReader.usingDictionary = true;
+        definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+        definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+        readPosInBytes = definitionLevels.getNextOffset();
+        // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
+        // actually copying the values out into the vectors
+        dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
+        dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+        dictionaryValueReader = new DictionaryValuesReader(dictionary);
+        dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+        this.parentColumnReader.usingDictionary = true;
+      }
+    }
+    // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector
+    // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will
+    // fit one record at a time, such as for variable length data. Both operations must start in the same location after the
+    // definition and repetition level data which is stored alongside the page data itself
+    readyToReadPosInBytes = readPosInBytes;
+    return true;
+  }
+  
+  public void clear(){
+    this.dataReader.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
new file mode 100644
index 0000000..ad849b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ParquetFixedWidthDictionaryReader extends ColumnReader{
+
+  ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
+                                    SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+  }
+
+  @Override
+  public void readField(long recordsToReadInThisPass) {
+
+    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+    int defLevel;
+    for (int i = 0; i < recordsReadInThisIteration; i++){
+      defLevel = pageReader.definitionLevels.readInteger();
+      // if the value is defined
+      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
+        if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
+          ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
+              pageReader.valueReader.readLong() );
+      }
+      // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
new file mode 100644
index 0000000..2228787
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.PrimitiveType;
+
+public class ParquetRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+
+  // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
+  private static final int NUMBER_OF_VECTORS = 1;
+  private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
+  private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
+  private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+
+  // TODO - should probably find a smarter way to set this, currently 1 megabyte
+  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
+  public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
+  private static final String SEPERATOR = System.getProperty("file.separator");
+
+  // used for clearing the last n bits of a byte
+  public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
+  // used for clearing the first n bits of a byte
+  public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
+
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength;
+  private int recordsPerBatch;
+  private long totalRecords;
+  private long rowGroupOffset;
+
+  private List<ColumnReader> columnStatuses;
+  private FileSystem fileSystem;
+  private long batchSize;
+  Path hadoopPath;
+  private VarLenBinaryReader varLengthReader;
+  private ParquetMetadata footer;
+  private List<SchemaPath> columns;
+  private final CodecFactoryExposer codecFactoryExposer;
+  int rowGroupIndex;
+
+  public ParquetRecordReader(FragmentContext fragmentContext, //
+                             String path, //
+                             int rowGroupIndex, //
+                             FileSystem fs, //
+                             CodecFactoryExposer codecFactoryExposer, //
+                             ParquetMetadata footer, //
+                             List<SchemaPath> columns) throws ExecutionSetupException {
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+        columns);
+  }
+
+  public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
+                             String path, int rowGroupIndex, FileSystem fs,
+                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
+                             List<SchemaPath> columns) throws ExecutionSetupException {
+    hadoopPath = new Path(path);
+    fileSystem = fs;
+    this.codecFactoryExposer = codecFactoryExposer;
+    this.rowGroupIndex = rowGroupIndex;
+    this.batchSize = batchSize;
+    this.footer = footer;
+    this.columns = columns;
+  }
+
+  public CodecFactoryExposer getCodecFactoryExposer() {
+    return codecFactoryExposer;
+  }
+
+  public Path getHadoopPath() {
+    return hadoopPath;
+  }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  public int getRowGroupIndex() {
+    return rowGroupIndex;
+  }
+
+  public int getBitWidthAllFixedFields() {
+    return bitWidthAllFixedFields;
+  }
+
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  /**
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
+    switch (type) {
+      case INT64:   return 64;
+      case INT32:   return 32;
+      case BOOLEAN: return 1;
+      case FLOAT:   return 32;
+      case DOUBLE:  return 64;
+      case INT96:   return 96;
+      // binary and fixed length byte array
+      default:
+        throw new IllegalStateException("Length cannot be determined for type " + type);
+    }
+  }
+
+  private boolean fieldSelected(MaterializedField field){
+    // TODO - not sure if this is how we want to represent this
+    // for now it makes the existing tests pass, simply selecting
+    // all available data if no columns are provided
+    if (this.columns != null){
+      for (SchemaPath expr : this.columns){
+        if ( field.matches(expr)){
+          return true;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+
+    columnStatuses = new ArrayList<>();
+    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+    allFieldsFixedLength = true;
+    ColumnDescriptor column;
+    ColumnChunkMetaData columnChunkMetaData;
+    int columnsToScan = 0;
+
+    MaterializedField field;
+    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+    FileMetaData fileMetaData;
+
+    // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+    // store a map from column name to converted types if they are non-null
+    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+    fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+    for (SchemaElement se : fileMetaData.getSchema()) {
+      schemaElements.put(se.getName(), se);
+    }
+
+    // loop to add up the length of the fixed width columns and build the schema
+    for (int i = 0; i < columns.size(); ++i) {
+      column = columns.get(i);
+      logger.debug("name: " + fileMetaData.getSchema().get(i).name);
+      SchemaElement se = schemaElements.get(column.getPath()[0]);
+      MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
+      field = MaterializedField.create(toFieldName(column.getPath()),mt);
+      if ( ! fieldSelected(field)){
+        continue;
+      }
+      columnsToScan++;
+      // sum the lengths of all of the fixed length fields
+      if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+        if (column.getMaxRepetitionLevel() > 0) {
+          allFieldsFixedLength = false;
+        }
+        // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
+        // TODO - implement this when the feature is added upstream
+          if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+              bitWidthAllFixedFields += se.getType_length() * 8;
+          } else {
+            bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+          }
+      } else {
+        allFieldsFixedLength = false;
+      }
+    }
+    rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+    // none of the columns in the parquet file matched the request columns from the query
+    if (columnsToScan == 0){
+      throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
+    }
+    if (allFieldsFixedLength) {
+      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+    }
+    else {
+      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+    }
+
+    try {
+      ValueVector v;
+      ConvertedType convertedType;
+      SchemaElement schemaElement;
+      ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+      // initialize all of the column read status objects
+      boolean fieldFixedLength = false;
+      for (int i = 0; i < columns.size(); ++i) {
+        column = columns.get(i);
+        columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
+        schemaElement = schemaElements.get(column.getPath()[0]);
+        convertedType = schemaElement.getConverted_type();
+        MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
+        field = MaterializedField.create(toFieldName(column.getPath()), type);
+        // the field was not requested to be read
+        if ( ! fieldSelected(field)) continue;
+
+        fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+        v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+          if (column.getMaxRepetitionLevel() > 0) {
+            ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch,
+                ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
+            varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
+                getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
+          }
+          else {
+            columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+                schemaElement));
+          }
+        } else {
+          // create a reader and add it to the appropriate list
+          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement));
+        }
+      }
+      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    } catch (Exception e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private SchemaPath toFieldName(String[] paths) {
+    return SchemaPath.getCompoundPath(paths);
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+    if (column.getMaxRepetitionLevel() > 0 ) {
+      return DataMode.REPEATED;
+    } else if (column.getMaxDefinitionLevel() == 0) {
+      return TypeProtos.DataMode.REQUIRED;
+    } else {
+      return TypeProtos.DataMode.OPTIONAL;
+    }
+  }
+
+  private void resetBatch() {
+    for (ColumnReader column : columnStatuses) {
+      column.valuesReadInCurrentPass = 0;
+    }
+    for (VarLengthColumn r : varLengthReader.columns){
+      r.valuesReadInCurrentPass = 0;
+    }
+  }
+
+ public void readAllFixedFields(long recordsToRead) throws IOException {
+
+   for (ColumnReader crs : columnStatuses){
+     crs.processPages(recordsToRead);
+   }
+ }
+
+  @Override
+  public int next() {
+    resetBatch();
+    long recordsToRead = 0;
+    try {
+      ColumnReader firstColumnStatus;
+      if (columnStatuses.size() > 0){
+        firstColumnStatus = columnStatuses.iterator().next();
+      }
+      else{
+        if (varLengthReader.columns.size() > 0){
+          firstColumnStatus = varLengthReader.columns.iterator().next();
+        }
+        else{
+          firstColumnStatus = null;
+        }
+      }
+      // TODO - replace this with new functionality of returning batches even if no columns are selected
+      // the query 'select 5 from parquetfile' should return the number of records that the parquet file contains
+      // we don't need to read any of the data, we just need to fill batches with a record count and a useless vector with
+      // the right number of values
+      if (firstColumnStatus == null) throw new DrillRuntimeException("Unexpected error reading parquet file, not reading any columns");
+
+      if (allFieldsFixedLength) {
+        recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+      } else {
+        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+
+      }
+
+      if (allFieldsFixedLength) {
+        readAllFixedFields(recordsToRead);
+      } else { // variable length columns
+        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+        readAllFixedFields(fixedRecordsToRead);
+      }
+
+      return firstColumnStatus.getRecordsReadInCurrentPass();
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    for (ColumnReader column : columnStatuses) {
+      column.clear();
+    }
+    columnStatuses.clear();
+
+    for (VarLengthColumn r : varLengthReader.columns){
+      r.clear();
+    }
+    varLengthReader.columns.clear();
+  }
+}


Mime
View raw message