drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [14/17] git commit: DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.
Date Tue, 29 Jul 2014 15:38:26 GMT
DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.

Fix records counts expected in parquet read tests, previously the wrong records counts were being expected and messages were sent to the logger, but in the form of debug messages. not errors, so they were not flagging when the tests were failing.

After review: Rename PageReadStatus to PageReader, removed underscore from package name column_readers, address review comment from parth in VarLengthColumnReaders.

Fix regression in nullable varlength reader for parquet. Change output names in parquet writer/reader tests to prevent issues with files not being deleted correctly during tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5b73c214
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5b73c214
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5b73c214

Branch: refs/heads/master
Commit: 5b73c2140f100bb74cbf4a20cd31d13ebf5a4b88
Parents: 686a282
Author: Jason Altekruse <altekrusejason@gmail.com>
Authored: Tue Jul 22 13:08:33 2014 -0500
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Tue Jul 29 08:36:27 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/NullableValueVectors.java |    19 +-
 .../templates/ParquetOutputRecordWriter.java    |     4 +
 .../codegen/templates/RepeatedValueVectors.java |    28 +-
 .../templates/VariableLengthVectors.java        |    18 +-
 .../drill/exec/store/parquet/BitReader.java     |    90 -
 .../exec/store/parquet/ColumnDataReader.java    |     6 +-
 .../drill/exec/store/parquet/ColumnReader.java  |   140 -
 .../store/parquet/FixedByteAlignedReader.java   |   154 -
 .../exec/store/parquet/NullableBitReader.java   |    66 -
 .../store/parquet/NullableColumnReader.java     |   136 -
 .../parquet/NullableFixedByteAlignedReader.java |   156 -
 .../NullableFixedByteAlignedReaders.java        |   169 -
 .../exec/store/parquet/PageReadStatus.java      |   191 -
 .../ParquetFixedWidthDictionaryReader.java      |    54 -
 .../exec/store/parquet/ParquetFormatPlugin.java |     2 +-
 .../exec/store/parquet/ParquetRecordReader.java |   711 -
 .../exec/store/parquet/ParquetRecordWriter.java |     3 +-
 .../store/parquet/ParquetScanBatchCreator.java  |     2 +-
 .../exec/store/parquet/VarLenBinaryReader.java  |   190 -
 .../store/parquet/VarLengthColumnReaders.java   |   365 -
 .../store/parquet/columnreaders/BitReader.java  |    89 +
 .../parquet/columnreaders/ColumnReader.java     |   220 +
 .../columnreaders/ColumnReaderFactory.java      |   175 +
 .../columnreaders/FixedByteAlignedReader.java   |   146 +
 .../columnreaders/FixedWidthRepeatedReader.java |   213 +
 .../columnreaders/NullableBitReader.java        |    61 +
 .../columnreaders/NullableColumnReader.java     |   139 +
 .../NullableFixedByteAlignedReaders.java        |   238 +
 .../NullableVarLengthValuesColumn.java          |   130 +
 .../store/parquet/columnreaders/PageReader.java |   225 +
 .../ParquetFixedWidthDictionaryReader.java      |    53 +
 .../columnreaders/ParquetRecordReader.java      |   361 +
 .../ParquetToDrillTypeConverter.java            |   236 +
 .../columnreaders/VarLenBinaryReader.java       |    81 +
 .../parquet/columnreaders/VarLengthColumn.java  |    60 +
 .../columnreaders/VarLengthColumnReaders.java   |   294 +
 .../columnreaders/VarLengthValuesColumn.java    |    96 +
 .../exec/vector/RepeatedFixedWidthVector.java   |    13 +
 .../vector/RepeatedVariableWidthVector.java     |     2 +
 .../drill/exec/vector/VariableWidthVector.java  |    12 +-
 .../exec/vector/complex/RepeatedListVector.java |    28 +-
 .../exec/vector/complex/RepeatedMapVector.java  |    29 +-
 .../physical/impl/writer/TestParquetWriter.java |    88 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   100 +-
 .../store/parquet/ParquetResultListener.java    |    45 +-
 .../exec/store/parquet/TestFileGenerator.java   |    49 +-
 .../resources/parquet/alltypes_repeated.json    |    28 +
 .../test/resources/parquet/basic_repeated.json  |     9 +
 .../test/resources/parquet/null_test_data.json  |    26 +
 ...et_repeated_performance_test_input_data.json |  2041 +
 .../resources/parquet/repeated_bool_data.json   | 73572 +++++++++++++++++
 .../resources/parquet/repeated_double_data.json |  2041 +
 .../parquet/repeated_integer_data.json          | 18481 +++++
 53 files changed, 99356 insertions(+), 2529 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index f50aae8..4873f2a 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -309,7 +309,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   }
 
   
-  public final class Accessor implements ValueVector.Accessor{
+  public final class Accessor implements ValueVector.Accessor<#if type.major = "VarLen">, VariableWidthVector.VariableWidthAccessor</#if>{
 
     final FieldReader reader = new Nullable${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
     
@@ -335,7 +335,14 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     public int isSet(int index){
       return bits.getAccessor().get(index);
     }
-    
+
+
+    <#if type.major == "VarLen">
+    public int getValueLength(int index) {
+      return values.getAccessor().getValueLength(index);
+    }
+    </#if>
+
     public void get(int index, Nullable${minor.class}Holder holder){
       values.getAccessor().get(index, holder);
       holder.isSet = bits.getAccessor().get(index);
@@ -372,7 +379,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     public void reset(){}
   }
   
-  public final class Mutator implements ValueVector.Mutator, NullableVectorDefinitionSetter{
+  public final class Mutator implements ValueVector.Mutator, NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
     
     private int setCount;
     <#if type.major = "VarLen"> private int lastSet = -1;</#if>
@@ -412,9 +419,13 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
         if(!values.getMutator().setSafe(i, new byte[]{})) return false;
       }
       lastSet = index;
-      
+
       return true;
     }
+
+    public boolean setValueLengthSafe(int index, int length) {
+      return values.getMutator().setValueLengthSafe(index, length);
+    }
     </#if>
     
     public boolean setSafe(int index, byte[] value, int start, int length) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 92267e7..5284199 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -20,6 +20,7 @@ import org.joda.time.DateTimeUtils;
 import parquet.io.api.Binary;
 
 import java.lang.Override;
+import java.lang.RuntimeException;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="org/apache/drill/exec/store/ParquetOutputRecordWriter.java" />
@@ -83,6 +84,9 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
       return;
     }
   <#elseif mode.prefix == "Repeated" >
+    // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
+    if (valueHolder.start == valueHolder.end)
+      return;
     consumer.startField(schema.getFieldName(fieldId), fieldId);
     for (int i = valueHolder.start; i < valueHolder.end; i++) {
   </#if>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 7bf84f2..b283d19 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -18,6 +18,7 @@
 
 import java.lang.Override;
 
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.mortbay.jetty.servlet.Holder;
 
@@ -63,9 +64,9 @@ package org.apache.drill.exec.vector;
   }
 
   public int getValueCapacity(){
-    return Math.min(values.getValueCapacity(), offsets.getValueCapacity());
+    return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1);
   }
-  
+
   public int getBufferSize(){
     return offsets.getBufferSize() + values.getBufferSize();
   }
@@ -271,8 +272,12 @@ package org.apache.drill.exec.vector;
   public Accessor getAccessor(){
     return accessor;
   }
-  
-  public final class Accessor implements ValueVector.Accessor{
+
+  // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for
+  // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge
+  // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable
+  // and we should refer to each in the generation template
+  public final class Accessor implements RepeatedFixedWidthVector.RepeatedAccessor{
     
     final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this);
     
@@ -367,6 +372,21 @@ package org.apache.drill.exec.vector;
     private Mutator(){
     }
 
+    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
+      return offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount);
+    }
+
+    public BaseDataValueVector getDataVector() {
+      return values;
+    }
+
+    public void setValueCounts(int parentValueCount, int childValueCount){
+      Repeated${minor.class}Vector.this.parentValueCount = parentValueCount;
+      Repeated${minor.class}Vector.this.childValueCount = childValueCount;
+      values.getMutator().setValueCount(childValueCount);
+      offsets.getMutator().setValueCount(childValueCount + 1);
+    }
+
     public boolean startNewGroup(int index) {
       if(getValueCapacity() <= index){
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 7f01058..1d30acb 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -15,6 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
 <@pp.dropOutputFile />
 <#list vv.types as type>
 <#list type.minor as minor>
@@ -279,7 +284,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return mutator;
   }
   
-  public final class Accessor extends BaseValueVector.BaseAccessor{
+  public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
     final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
     
     public FieldReader getReader(){
@@ -295,6 +300,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data.getBytes(startIdx, dst, 0, length);
       return dst;
     }
+
+    public int getValueLength(int index) {
+      return offsetVector.getAccessor().get(index + 1) - offsetVector.getAccessor().get(index);
+    }
     
     public void get(int index, ${minor.class}Holder holder){
       holder.start = offsetVector.getAccessor().get(index);
@@ -355,7 +364,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
    *
    * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
    */
-  public final class Mutator extends BaseValueVector.BaseMutator{
+  public final class Mutator extends BaseValueVector.BaseMutator implements VariableWidthVector.VariableWidthMutator {
 
     /**
      * Set the variable length element at the specified index to the supplied byte array.
@@ -417,7 +426,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       return true;
     }
 
-   
+    public boolean setValueLengthSafe(int index, int length) {
+      return offsetVector.getMutator().setSafe(index + 1, offsetVector.getAccessor().get(index) + length);
+    }
+
     public boolean setSafe(int index, Nullable${minor.class}Holder holder){
       assert holder.isSet == 1;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
deleted file mode 100644
index 7ae95cd..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-final class BitReader extends ColumnReader {
-
-  private byte currentByte;
-  private byte nextByte;
-  private byte[] bytes;
-  
-  BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  @Override
-  protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
-        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
-    readStartInBytes = pageReadStatus.readPosInBytes;
-    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-    readLength = (int) Math.ceil(readLengthInBits / 8.0);
-
-    bytes = pageReadStatus.pageDataByteArray;
-    // standard read, using memory mapping
-    if (pageReadStatus.bitShift == 0) {
-      ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
-          (int) readStartInBytes, (int) readLength);
-    } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
-
-      vectorData = ((BaseDataValueVector) valueVec).getData();
-      nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
-      readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
-
-      int i = 0;
-      // read individual bytes with appropriate shifting
-      for (; i < (int) readLength; i++) {
-        currentByte = nextByte;
-        currentByte = (byte) (currentByte >>> pageReadStatus.bitShift);
-        // mask the bits about to be added from the next byte
-        currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReadStatus.bitShift - 1]);
-        // if we are not on the last byte
-        if ((int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i < pageReadStatus.byteLength) {
-          // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
-          nextByte = bytes[(int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i];
-          currentByte = (byte) (currentByte | nextByte
-              << (8 - pageReadStatus.bitShift)
-              & ParquetRecordReader.endBitMasks[8 - pageReadStatus.bitShift - 1]);
-        }
-        vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
-      }
-      vectorData.setIndex(0, (valuesReadInCurrentPass / 8)
-          + (int) readLength - 1);
-      vectorData.capacity(vectorData.writerIndex() + 1);
-    }
-
-    // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
-    // shifted by to read all of the values into the vector without leaving space
-    if (readLengthInBits % 8 != 0) {
-      pageReadStatus.bitShift = (int) readLengthInBits % 8;
-    } else {
-      pageReadStatus.bitShift = 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 8c6f120..80fbd80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -28,7 +28,7 @@ import parquet.bytes.BytesInput;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 
-class ColumnDataReader {
+public class ColumnDataReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
   
   private final long endPosition;
@@ -41,11 +41,7 @@ class ColumnDataReader {
   }
   
   public PageHeader readPageHeader() throws IOException{
-    try{
     return Util.readPageHeader(input);
-    }catch (IOException e) {
-      throw e;
-    }
   }
   
   public BytesInput getPageAsBytesInput(int pageLength) throws IOException{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
deleted file mode 100644
index 775fc73..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.store.VectorHolder;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-import java.io.IOException;
-
-abstract class ColumnReader<V extends ValueVector> {
-  
-  final ParquetRecordReader parentReader;
-  
-  // Value Vector for this column
-  final V valueVec;
-
-  ColumnDescriptor getColumnDescriptor() {
-    return columnDescriptor;
-  }
-
-  // column description from the parquet library
-  final ColumnDescriptor columnDescriptor;
-  // metadata of the column, from the parquet library
-  final ColumnChunkMetaData columnChunkMetaData;
-  // status information on the current page
-  final PageReadStatus pageReadStatus;
-
-  final SchemaElement schemaElement;
-  boolean usingDictionary;
-
-  // quick reference to see if the field is fixed length (as this requires an instanceof)
-  final boolean isFixedLength;
-
-  // counter for the total number of values read from one or more pages
-  // when a batch is filled all of these values should be the same for all of the columns
-  int totalValuesRead;
-  
-  // counter for the values that have been read in this pass (a single call to the next() method)
-  int valuesReadInCurrentPass;
-  
-  // length of single data value in bits, if the length is fixed
-  int dataTypeLengthInBits;
-  int bytesReadInCurrentPass;
-
-  protected ByteBuf vectorData;
-  // when reading definition levels for nullable columns, it is a one-way stream of integers
-  // when reading var length data, where we don't know if all of the records will fit until we've read all of them
-  // we must store the last definition level an use it in at the start of the next batch
-  int currDefLevel;
-
-  // variables for a single read pass
-  long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
-
-  protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-    this.parentReader = parentReader;
-    this.columnDescriptor = descriptor;
-    this.columnChunkMetaData = columnChunkMetaData;
-    this.isFixedLength = fixedLength;
-    this.schemaElement = schemaElement;
-
-    if (allocateSize > 1) {
-      valueVec =  v;
-    } else {
-      valueVec =  v;
-    }
-
-
-    this.pageReadStatus = new PageReadStatus(this, parentReader.fileSystem, parentReader.hadoopPath, columnChunkMetaData);
-
-    if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-      if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-        dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
-      } else {
-        dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
-      }
-    }
-
-  }
-
-  public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
-    readStartInBytes = 0;
-    readLength = 0;
-    readLengthInBits = 0;
-    recordsReadInThisIteration = 0;
-    vectorData = ((BaseValueVector) valueVec).getData();
-    do {
-      // if no page has been read, or all of the records have been read out of a page, read the next one
-      if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
-        if (!pageReadStatus.next()) {
-          break;
-        }
-      }
-
-      readField(recordsToReadInThisPass, firstColumnStatus);
-
-      valuesReadInCurrentPass += recordsReadInThisIteration;
-      totalValuesRead += recordsReadInThisIteration;
-      pageReadStatus.valuesRead += recordsReadInThisIteration;
-      if (readStartInBytes + readLength >= pageReadStatus.byteLength) {
-        pageReadStatus.next();
-      } else {
-        pageReadStatus.readPosInBytes = readStartInBytes + readLength;
-      }
-    } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
-    valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
-  }
-
-  public void clear() {
-    valueVec.clear();
-    this.pageReadStatus.clear();
-  }
-
-  protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
deleted file mode 100644
index 26e1f09..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.DateHolder;
-import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
-import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.vector.DateVector;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.joda.time.DateTimeUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import java.math.BigDecimal;
-
-class FixedByteAlignedReader extends ColumnReader {
-
-  protected byte[] bytes;
-
-  
-  FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                         boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  // this method is called by its superclass during a read loop
-  @Override
-  protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
-        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
-    readStartInBytes = pageReadStatus.readPosInBytes;
-    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-    readLength = (int) Math.ceil(readLengthInBits / 8.0);
-
-    bytes = pageReadStatus.pageDataByteArray;
-    // vectorData is assigned by the superclass read loop method
-    vectorData.writeBytes(bytes,
-        (int) readStartInBytes, (int) readLength);
-  }
-
-  public static abstract class ConvertedReader extends FixedByteAlignedReader {
-
-    protected int dataTypeLengthInBytes;
-
-    ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                           boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
-              - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
-      readStartInBytes = pageReadStatus.readPosInBytes;
-      readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-      readLength = (int) Math.ceil(readLengthInBits / 8.0);
-
-      bytes = pageReadStatus.pageDataByteArray;
-
-      dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
-      for (int i = 0; i < recordsReadInThisIteration; i++) {
-        addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
-      }
-    }
-
-    /**
-     * Reads from bytes, converts, and writes to buffer
-     * @param start the index in bytes to start reading from
-     * @param index the index of the ValueVector
-     */
-    abstract void addNext(int start, int index);
-  }
-
-  public static class DateReader extends ConvertedReader {
-
-    DateVector dateVector;
-
-    DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      dateVector = (DateVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
-          NullableFixedByteAlignedReader.NullableDateReader.readIntLittleEndian(bytes, start)
-              - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
-    }
-  }
-
-  public static class Decimal28Reader extends ConvertedReader {
-
-    Decimal28SparseVector decimal28Vector;
-
-    Decimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      decimal28Vector = (Decimal28SparseVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
-    }
-  }
-
-  public static class Decimal38Reader extends ConvertedReader {
-
-    Decimal38SparseVector decimal38Vector;
-
-    Decimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      decimal38Vector = (Decimal38SparseVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
deleted file mode 100644
index 29ca30a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import java.io.IOException;
-
-/**
- * This class is used in conjunction with its superclass to read nullable bit columns in a parquet file.
- * It currently is using an inefficient value-by-value approach.
- * TODO - make this more efficient by copying runs of values like in NullableFixedByteAlignedReader
- * This will also involve incorporating the ideas from the BitReader (the reader for non-nullable bits)
- * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values
- * in the next batch.
- */
-final class NullableBitReader extends ColumnReader {
-
-  NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  @Override
-  public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
-        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-    int defLevel;
-    for (int i = 0; i < recordsReadInThisIteration; i++){
-      defLevel = pageReadStatus.definitionLevels.readInteger();
-      // if the value is defined
-      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
-        if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
-            pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) {
-          throw new RuntimeException();
-        }
-      }
-      // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
deleted file mode 100644
index 585fd66..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import java.io.IOException;
-
-abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{
-
-  int nullsFound;
-  // used to skip nulls found
-  int rightBitShift;
-  // used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte
-  int bitsUsed;
-
-  NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
-    readStartInBytes = 0;
-    readLength = 0;
-    readLengthInBits = 0;
-    recordsReadInThisIteration = 0;
-    vectorData = ((BaseValueVector)valueVec).getData();
-
-    do {
-      // if no page has been read, or all of the records have been read out of a page, read the next one
-      if (pageReadStatus.currentPage == null
-          || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
-        if (!pageReadStatus.next()) {
-          break;
-        }
-      }
-
-      // values need to be spaced out where nulls appear in the column
-      // leaving blank space for nulls allows for random access to values
-      // to optimize copying data out of the buffered disk stream, runs of defined values
-      // are located and copied together, rather than copying individual values
-
-      long runStart = pageReadStatus.readPosInBytes;
-      int runLength;
-      int currentDefinitionLevel = 0;
-      int currentValueIndexInVector = (int) recordsReadInThisIteration;
-      boolean lastValueWasNull = true;
-      int definitionLevelsRead;
-      // loop to find the longest run of defined values available, can be preceded by several nulls
-      while (true){
-        definitionLevelsRead = 0;
-        lastValueWasNull = true;
-        nullsFound = 0;
-        runLength = 0;
-        if (currentValueIndexInVector == recordsToReadInThisPass
-            || currentValueIndexInVector >= valueVec.getValueCapacity()) {
-          break;
-        }
-        while(currentValueIndexInVector < recordsToReadInThisPass
-            && currentValueIndexInVector < valueVec.getValueCapacity()
-            && pageReadStatus.valuesRead + definitionLevelsRead < pageReadStatus.currentPage.getValueCount()){
-          currentDefinitionLevel = pageReadStatus.definitionLevels.readInteger();
-          definitionLevelsRead++;
-          if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
-            // a run of non-null values was found, break out of this loop to do a read in the outer loop
-            nullsFound++;
-            if ( ! lastValueWasNull ){
-              currentValueIndexInVector++;
-              break;
-            }
-            lastValueWasNull = true;
-          }
-          else{
-            if (lastValueWasNull){
-              runStart = pageReadStatus.readPosInBytes;
-              runLength = 0;
-              lastValueWasNull = false;
-            }
-            runLength++;
-            ((NullableVectorDefinitionSetter)valueVec.getMutator()).setIndexDefined(currentValueIndexInVector);
-          }
-          currentValueIndexInVector++;
-        }
-        pageReadStatus.readPosInBytes = runStart;
-        recordsReadInThisIteration = runLength;
-
-        readField( runLength, firstColumnStatus);
-        int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
-        if ( dataTypeLengthInBits > 8  || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
-          ((BaseValueVector) valueVec).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
-        }
-        else if (dataTypeLengthInBits < 8){
-          rightBitShift += dataTypeLengthInBits * nullsFound;
-        }
-        recordsReadInThisIteration += nullsFound;
-        valuesReadInCurrentPass += recordsReadInThisIteration;
-        totalValuesRead += recordsReadInThisIteration;
-        pageReadStatus.valuesRead += recordsReadInThisIteration;
-        if ( (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0)
-            || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
-          if (!pageReadStatus.next()) {
-            break;
-          }
-        } else {
-          pageReadStatus.readPosInBytes = readStartInBytes + readLength;
-        }
-      }
-    } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
-    valueVec.getMutator().setValueCount(
-        valuesReadInCurrentPass);
-  }
-
-  protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
deleted file mode 100644
index 17759d3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
-import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import org.joda.time.DateTimeUtils;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import java.math.BigDecimal;
-
-class NullableFixedByteAlignedReader extends NullableColumnReader {
-
-  protected byte[] bytes;
-
-  NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  // this method is called by its superclass during a read loop
-  @Override
-  protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-    this.recordsReadInThisIteration = recordsToReadInThisPass;
-
-    // set up metadata
-    this.readStartInBytes = pageReadStatus.readPosInBytes;
-    this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-    this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-    this.bytes = pageReadStatus.pageDataByteArray;
-    
-    // fill in data.
-    vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
-  }
-
-  public static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader {
-
-    protected int dataTypeLengthInBytes;
-
-    NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-      this.recordsReadInThisIteration = recordsToReadInThisPass;
-
-      // set up metadata
-      this.readStartInBytes = pageReadStatus.readPosInBytes;
-      this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-      this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-      this.bytes = pageReadStatus.pageDataByteArray;
-
-      dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
-      for (int i = 0; i < recordsReadInThisIteration; i++) {
-        addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
-      }
-    }
-
-    abstract void addNext(int start, int index);
-  }
-
-  public static class NullableDateReader extends NullableConvertedReader {
-
-    NullableDateVector dateVector;
-
-    NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      dateVector = (NullableDateVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
-    }
-
-    // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
-    public static int readIntLittleEndian(byte[] in, int offset) {
-      int ch4 = in[offset] & 0xff;
-      int ch3 = in[offset + 1] & 0xff;
-      int ch2 = in[offset + 2] & 0xff;
-      int ch1 = in[offset + 3] & 0xff;
-      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-    }
-
-  }
-
-  public static class NullableDecimal28Reader extends NullableConvertedReader {
-
-    NullableDecimal28SparseVector decimal28Vector;
-
-    NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      decimal28Vector = (NullableDecimal28SparseVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      int width = NullableDecimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
-    }
-  }
-
-  public static class NullableDecimal38Reader extends NullableConvertedReader {
-
-    NullableDecimal38SparseVector decimal38Vector;
-
-    NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      decimal38Vector = (NullableDecimal38SparseVector) v;
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      int width = NullableDecimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
deleted file mode 100644
index 76cc937..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-class NullableFixedByteAlignedReaders {
-
-  public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
-                                                      ColumnDescriptor columnDescriptor,
-                                                      ColumnChunkMetaData columnChunkMetaData,
-                                                      boolean fixedLength,
-                                                      ValueVector valueVec,
-                                                      SchemaElement schemaElement) throws ExecutionSetupException {
-    if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-      return new NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
-              fixedLength, valueVec, schemaElement);
-    } else {
-      if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
-        return new NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
-              fixedLength, (NullableBigIntVector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
-        return new NullableDicationaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableIntVector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
-        return new NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
-        return new NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
-      }
-      else{
-        throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
-      }
-    }
-  }
-
-  private static class NullableFixedByteAlignedReader extends NullableColumnReader {
-    private byte[] bytes;
-
-    NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    // this method is called by its superclass during a read loop
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      this.recordsReadInThisIteration = recordsToReadInThisPass;
-
-      // set up metadata
-      this.readStartInBytes = pageReadStatus.readPosInBytes;
-      this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-      this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-      this.bytes = pageReadStatus.pageDataByteArray;
-
-      // fill in data.
-      vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
-    }
-  }
-
-  private static class NullableDicationaryIntReader extends NullableColumnReader<NullableIntVector> {
-
-    private byte[] bytes;
-
-    NullableDicationaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                 ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
-                                 SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    // this method is called by its superclass during a read loop
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      if (usingDictionary) {
-        for (int i = 0; i < recordsToReadInThisPass; i++){
-          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readInteger());
-        }
-      }
-    }
-  }
-
-  private static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
-
-    private byte[] bytes;
-
-    NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
-                                   SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    // this method is called by its superclass during a read loop
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readLong());
-      }
-    }
-  }
-
-  private static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
-
-    private byte[] bytes;
-
-    NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
-                                   SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    // this method is called by its superclass during a read loop
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readFloat());
-      }
-    }
-  }
-
-  private static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
-
-    private byte[] bytes;
-
-    NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
-                                  SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    // this method is called by its superclass during a read loop
-    @Override
-    protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readDouble());
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
deleted file mode 100644
index 3ad1d6c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.proto.SchemaDefProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.bytes.BytesInput;
-import parquet.column.Dictionary;
-import parquet.column.Encoding;
-import parquet.column.ValuesType;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
-import parquet.column.values.ValuesReader;
-import parquet.column.values.dictionary.DictionaryValuesReader;
-import parquet.format.PageHeader;
-import parquet.format.PageType;
-import parquet.format.Util;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-// class to keep track of the read position of variable length columns
-final class PageReadStatus {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReadStatus.class);
-
-  private final ColumnReader parentColumnReader;
-  private final ColumnDataReader dataReader;
-  // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
-  Page currentPage;
-  // buffer to store bytes of current page
-  byte[] pageDataByteArray;
-  // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
-  long readPosInBytes;
-  // bit shift needed for the next page if the last one did not line up with a byte boundary
-  int bitShift;
-  // storage space for extra bits at the end of a page if they did not line up with a byte boundary
-  // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
-  //byte extraBits;
-  // the number of values read out of the last page
-  int valuesRead;
-  int byteLength;
-  //int rowGroupIndex;
-  ValuesReader definitionLevels;
-  ValuesReader valueReader;
-  Dictionary dictionary;
-  PageHeader pageHeader = null;
-
-  PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
-    this.parentColumnReader = parentStatus;
-
-    long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
-    long start = columnChunkMetaData.getFirstDataPageOffset();
-    try {
-      FSDataInputStream f = fs.open(path);
-      this.dataReader = new ColumnDataReader(f, start, totalByteLength);
-      if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-        f.seek(columnChunkMetaData.getDictionaryPageOffset());
-        PageHeader pageHeader = Util.readPageHeader(f);
-        assert pageHeader.type == PageType.DICTIONARY_PAGE;
-        BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress( //
-                dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
-                pageHeader.getUncompressed_page_size(), //
-                parentColumnReader.columnChunkMetaData.getCodec());
-        DictionaryPage page = new DictionaryPage(
-            bytesIn,
-            pageHeader.uncompressed_page_size,
-            pageHeader.dictionary_page_header.num_values,
-            parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
-        );
-        this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      }
-    } catch (IOException e) {
-      throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e);
-    }
-    
-  }
-
-  
-  /**
-   * Grab the next page.
-   *
-   * @return - if another page was present
-   * @throws java.io.IOException
-   */
-  public boolean next() throws IOException {
-
-    currentPage = null;
-    valuesRead = 0;
-
-    // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
-    // and submit a bug report
-    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
-      return false;
-    }
-
-    // next, we need to decompress the bytes
-    // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-    // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-    do {
-      pageHeader = dataReader.readPageHeader();
-      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        System.out.println(pageHeader.dictionary_page_header.getEncoding());
-        BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress( //
-                dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
-                pageHeader.getUncompressed_page_size(), //
-                parentColumnReader.columnChunkMetaData.getCodec());
-        DictionaryPage page = new DictionaryPage(
-            bytesIn,
-            pageHeader.uncompressed_page_size,
-            pageHeader.dictionary_page_header.num_values,
-            parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
-        );
-        this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
-      }
-    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
-
-    BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-        .decompress( //
-            dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // 
-            pageHeader.getUncompressed_page_size(), //
-            parentColumnReader.columnChunkMetaData.getCodec());
-    currentPage = new Page(
-        bytesIn,
-        pageHeader.data_page_header.num_values,
-        pageHeader.uncompressed_page_size,
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
-    );
-
-    byteLength = pageHeader.uncompressed_page_size;
-
-    if (currentPage == null) {
-      return false;
-    }
-
-    pageDataByteArray = currentPage.getBytes().toByteArray();
-
-    readPosInBytes = 0;
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
-      parentColumnReader.currDefLevel = -1;
-      if (!currentPage.getValueEncoding().usesDictionary()) {
-        definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
-        readPosInBytes = definitionLevels.getNextOffset();
-        if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-          valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-          valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
-        }
-      } else {
-        definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
-        readPosInBytes = definitionLevels.getNextOffset();
-        valueReader = new DictionaryValuesReader(dictionary);
-        valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
-        this.parentColumnReader.usingDictionary = true;
-      }
-    }
-    return true;
-  }
-  
-  public void clear(){
-    this.dataReader.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
deleted file mode 100644
index c0720a9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-public class ParquetFixedWidthDictionaryReader extends ColumnReader{
-
-  ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
-                                    SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-  }
-
-  @Override
-  public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
-    recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
-        - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-    int defLevel;
-    for (int i = 0; i < recordsReadInThisIteration; i++){
-      defLevel = pageReadStatus.definitionLevels.readInteger();
-      // if the value is defined
-      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
-        if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
-          ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
-              pageReadStatus.valueReader.readLong() );
-      }
-      // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 0189c9b..37d6403 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -64,7 +64,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
   private final DrillbitContext context;
-  static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+  public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
   private CodecFactoryExposer codecFactoryExposer;
   private final DrillFileSystem fs;
   private final ParquetFormatMatcher formatMatcher;


Mime
View raw message