drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [13/17] DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.
Date Tue, 29 Jul 2014 15:38:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
deleted file mode 100644
index 703ad1f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal28Reader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal38Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal28Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal38Reader;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.ConvertedType;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-public class ParquetRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
-
-  // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
-  private static final int NUMBER_OF_VECTORS = 1;
-  private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
-  private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
-  private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
-
-  // TODO - should probably find a smarter way to set this, currently 1 megabyte
-  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
-  public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
-  private static final String SEPERATOR = System.getProperty("file.separator");
-
-
-  // used for clearing the last n bits of a byte
-  public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
-  // used for clearing the first n bits of a byte
-  public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
-
-  private int bitWidthAllFixedFields;
-  private boolean allFieldsFixedLength;
-  private int recordsPerBatch;
-  private long totalRecords;
-  private long rowGroupOffset;
-
-  private List<ColumnReader> columnStatuses;
-  FileSystem fileSystem;
-  private long batchSize;
-  Path hadoopPath;
-  private VarLenBinaryReader varLengthReader;
-  private ParquetMetadata footer;
-  private List<SchemaPath> columns;
-
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
-  }
-
-  private final CodecFactoryExposer codecFactoryExposer;
-
-  int rowGroupIndex;
-
-  public ParquetRecordReader(FragmentContext fragmentContext, //
-                             String path, //
-                             int rowGroupIndex, //
-                             FileSystem fs, //
-                             CodecFactoryExposer codecFactoryExposer, //
-                             ParquetMetadata footer, //
-                             List<SchemaPath> columns) throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
-        columns);
-  }
-
-  public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
-                             String path, int rowGroupIndex, FileSystem fs,
-                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
-                             List<SchemaPath> columns) throws ExecutionSetupException {
-    hadoopPath = new Path(path);
-    fileSystem = fs;
-    this.codecFactoryExposer = codecFactoryExposer;
-    this.rowGroupIndex = rowGroupIndex;
-    this.batchSize = batchSize;
-    this.footer = footer;
-    this.columns = columns;
-  }
-
-  public int getRowGroupIndex() {
-    return rowGroupIndex;
-  }
-
-  public int getBitWidthAllFixedFields() {
-    return bitWidthAllFixedFields;
-  }
-
-  public long getBatchSize() {
-    return batchSize;
-  }
-
-  /**
-   * @param type a fixed length type from the parquet library enum
-   * @return the length in pageDataByteArray of the type
-   */
-  public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
-    switch (type) {
-      case INT64:   return 64;
-      case INT32:   return 32;
-      case BOOLEAN: return 1;
-      case FLOAT:   return 32;
-      case DOUBLE:  return 64;
-      case INT96:   return 96;
-      // binary and fixed length byte array
-      default:
-        throw new IllegalStateException("Length cannot be determined for type " + type);
-    }
-  }
-
-  private boolean fieldSelected(MaterializedField field){
-    // TODO - not sure if this is how we want to represent this
-    // for now it makes the existing tests pass, simply selecting
-    // all available data if no columns are provided
-    if (this.columns != null){
-      for (SchemaPath expr : this.columns){
-        if ( field.matches(expr)){
-          return true;
-        }
-      }
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-
-    columnStatuses = new ArrayList<>();
-    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
-    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
-    allFieldsFixedLength = true;
-    ColumnDescriptor column;
-    ColumnChunkMetaData columnChunkMetaData;
-    int columnsToScan = 0;
-
-    MaterializedField field;
-    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
-    FileMetaData fileMetaData;
-
-    // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
-    // store a map from column name to converted types if they are non-null
-    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
-    fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
-    for (SchemaElement se : fileMetaData.getSchema()) {
-      schemaElements.put(se.getName(), se);
-    }
-
-    // loop to add up the length of the fixed width columns and build the schema
-    for (int i = 0; i < columns.size(); ++i) {
-      column = columns.get(i);
-      logger.debug("name: " + fileMetaData.getSchema().get(i).name);
-      SchemaElement se = schemaElements.get(column.getPath()[0]);
-      MajorType mt = toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
-      field = MaterializedField.create(toFieldName(column.getPath()),mt);
-      if ( ! fieldSelected(field)){
-        continue;
-      }
-      columnsToScan++;
-      // sum the lengths of all of the fixed length fields
-      if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-        // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
-        // TODO - implement this when the feature is added upstream
-          if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
-              bitWidthAllFixedFields += se.getType_length() * 8;
-          } else {
-            bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
-          }
-      } else {
-        allFieldsFixedLength = false;
-      }
-    }
-    rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
-
-    // none of the columns in the parquet file matched the request columns from the query
-    if (columnsToScan == 0){
-      throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
-    }
-    if (allFieldsFixedLength) {
-      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
-          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
-    }
-    else {
-      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
-    }
-
-    try {
-      ValueVector v;
-      ConvertedType convertedType;
-      SchemaElement schemaElement;
-      ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
-      ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
-      // initialize all of the column read status objects
-      boolean fieldFixedLength = false;
-      for (int i = 0; i < columns.size(); ++i) {
-        column = columns.get(i);
-        columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
-        schemaElement = schemaElements.get(column.getPath()[0]);
-        convertedType = schemaElement.getConverted_type();
-        MajorType type = toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
-        field = MaterializedField.create(toFieldName(column.getPath()), type);
-        // the field was not requested to be read
-        if ( ! fieldSelected(field)) continue;
-
-        fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
-        v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
-        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-          createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
-            schemaElement);
-        } else {
-          // create a reader and add it to the appropriate list
-          getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement, varLengthColumns, nullableVarLengthColumns);
-        }
-      }
-      varLengthReader = new VarLenBinaryReader(this, varLengthColumns, nullableVarLengthColumns);
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  private SchemaPath toFieldName(String[] paths) {
-    return SchemaPath.getCompoundPath(paths);
-  }
-
-  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
-    if (column.getMaxDefinitionLevel() == 0) {
-      return TypeProtos.DataMode.REQUIRED;
-    } else {
-      return TypeProtos.DataMode.OPTIONAL;
-    }
-  }
-
-  private void resetBatch() {
-    for (ColumnReader column : columnStatuses) {
-      column.valuesReadInCurrentPass = 0;
-    }
-    for (VarLengthColumn r : varLengthReader.columns){
-      r.valuesReadInCurrentPass = 0;
-    }
-    for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      r.valuesReadInCurrentPass = 0;
-    }
-  }
-
-  /**
-   * @param fixedLength
-   * @param descriptor
-   * @param columnChunkMetaData
-   * @param allocateSize - the size of the vector to create
-   * @return
-   * @throws SchemaChangeException
-   */
-  private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
-                                          ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
-                                          SchemaElement schemaElement)
-      throws SchemaChangeException, ExecutionSetupException {
-    ConvertedType convertedType = schemaElement.getConverted_type();
-    // if the column is required
-    if (descriptor.getMaxDefinitionLevel() == 0){
-      if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
-        columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v, schemaElement));
-      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
-        int length = schemaElement.type_length;
-        if (length <= 12) {
-          columnStatuses.add(new Decimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-        } else if (length <= 16) {
-          columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-        }
-      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
-        columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-      } else{
-        if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-          columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData,
-              fixedLength, v, schemaElement));
-        } else {
-          columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
-              fixedLength, v, schemaElement));
-        }
-      }
-      return true;
-    }
-    else { // if the column is nullable
-      if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
-        columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v, schemaElement));
-      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
-        columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
-        int length = schemaElement.type_length;
-        if (length <= 12) {
-          columnStatuses.add(new NullableDecimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-        } else if (length <= 16) {
-          columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
-        }
-      } else {
-        columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this, allocateSize, descriptor,
-            columnChunkMetaData, fixedLength, v, schemaElement));
-      }
-      return true;
-    }
-  }
-
- public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException {
-
-   for (ColumnReader crs : columnStatuses){
-     crs.readAllFixedFields(recordsToRead, firstColumnStatus);
-   }
- }
-
-  @Override
-  public int next() {
-    resetBatch();
-    long recordsToRead = 0;
-    try {
-      ColumnReader firstColumnStatus;
-      if (columnStatuses.size() > 0){
-        firstColumnStatus = columnStatuses.iterator().next();
-      }
-      else{
-        if (varLengthReader.columns.size() > 0){
-          firstColumnStatus = varLengthReader.columns.iterator().next();
-        }
-        else{
-         firstColumnStatus = varLengthReader.nullableColumns.iterator().next();
-        }
-      }
-
-      if (allFieldsFixedLength) {
-        recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
-      } else {
-        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
-
-        // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
-        // get too complicated
-
-        //loop through variable length data to find the maximum records that will fit in this batch
-        // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable
-        // length values...
-        // jacques believes that variable length fields will be encoded as |length|value|length|value|...
-        // cannot find more information on this right now, will keep looking
-      }
-
-//      logger.debug("records to read in this pass: {}", recordsToRead);
-      if (allFieldsFixedLength) {
-        readAllFixedFields(recordsToRead, firstColumnStatus);
-      } else { // variable length columns
-        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
-        readAllFixedFields(fixedRecordsToRead, firstColumnStatus);
-      }
-
-      return firstColumnStatus.valuesReadInCurrentPass;
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
-                                               TypeProtos.DataMode mode, SchemaElement schemaElement) {
-    return toMajorType(primitiveTypeName, 0, mode, schemaElement);
-  }
-
-  // TODO - move this into ParquetTypeHelper and use code generation to create the list
-  static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
-                                               TypeProtos.DataMode mode, SchemaElement schemaElement) {
-    ConvertedType convertedType = schemaElement.getConverted_type();
-    switch (mode) {
-
-      case OPTIONAL:
-        switch (primitiveTypeName) {
-          case BINARY:
-            if (convertedType == null) {
-              return Types.optional(TypeProtos.MinorType.VARBINARY);
-            }
-            switch (convertedType) {
-              case UTF8:
-                return Types.optional(MinorType.VARCHAR);
-              case DECIMAL:
-                return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT64:
-            if (convertedType == null) {
-              return Types.optional(TypeProtos.MinorType.BIGINT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
-              case FINETIME:
-                throw new UnsupportedOperationException();
-              case TIMESTAMP:
-                return Types.optional(MinorType.TIMESTAMP);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT32:
-            if (convertedType == null) {
-              return Types.optional(TypeProtos.MinorType.INT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
-              case DATE:
-                return Types.optional(MinorType.DATE);
-              case TIME:
-                return Types.optional(MinorType.TIME);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case BOOLEAN:
-            return Types.optional(TypeProtos.MinorType.BIT);
-          case FLOAT:
-            return Types.optional(TypeProtos.MinorType.FLOAT4);
-          case DOUBLE:
-            return Types.optional(TypeProtos.MinorType.FLOAT8);
-          // TODO - Both of these are not supported by the parquet library yet (7/3/13),
-          // but they are declared here for when they are implemented
-          case INT96:
-            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
-                .setMode(mode).build();
-          case FIXED_LEN_BYTE_ARRAY:
-            if (convertedType == null) {
-              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
-              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                      .setWidth(length).setMode(mode).build();
-            } else if (convertedType == ConvertedType.DECIMAL) {
-              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
-            }
-          default:
-            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
-        }
-      case REQUIRED:
-        switch (primitiveTypeName) {
-          case BINARY:
-            if (convertedType == null) {
-              return Types.required(TypeProtos.MinorType.VARBINARY);
-            }
-            switch (convertedType) {
-              case UTF8:
-                return Types.required(MinorType.VARCHAR);
-              case DECIMAL:
-                return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT64:
-            if (convertedType == null) {
-              return Types.required(MinorType.BIGINT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
-              case FINETIME:
-                throw new UnsupportedOperationException();
-              case TIMESTAMP:
-                return Types.required(MinorType.TIMESTAMP);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT32:
-            if (convertedType == null) {
-              return Types.required(MinorType.INT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
-              case DATE:
-                return Types.required(MinorType.DATE);
-              case TIME:
-                return Types.required(MinorType.TIME);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case BOOLEAN:
-            return Types.required(TypeProtos.MinorType.BIT);
-          case FLOAT:
-            return Types.required(TypeProtos.MinorType.FLOAT4);
-          case DOUBLE:
-            return Types.required(TypeProtos.MinorType.FLOAT8);
-          // Both of these are not supported by the parquet library yet (7/3/13),
-          // but they are declared here for when they are implemented
-          case INT96:
-            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
-                .setMode(mode).build();
-          case FIXED_LEN_BYTE_ARRAY:
-            if (convertedType == null) {
-              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
-              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                      .setWidth(length).setMode(mode).build();
-            } else if (convertedType == ConvertedType.DECIMAL) {
-              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
-            }
-          default:
-            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
-        }
-      case REPEATED:
-        switch (primitiveTypeName) {
-          case BINARY:
-            if (convertedType == null) {
-              return Types.repeated(TypeProtos.MinorType.VARBINARY);
-            }
-            switch (schemaElement.getConverted_type()) {
-              case UTF8:
-                return Types.repeated(MinorType.VARCHAR);
-              case DECIMAL:
-                return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT64:
-            if (convertedType == null) {
-              return Types.repeated(MinorType.BIGINT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
-              case FINETIME:
-                throw new UnsupportedOperationException();
-              case TIMESTAMP:
-                return Types.repeated(MinorType.TIMESTAMP);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case INT32:
-            if (convertedType == null) {
-              return Types.repeated(MinorType.INT);
-            }
-            switch(convertedType) {
-              case DECIMAL:
-                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
-              case DATE:
-                return Types.repeated(MinorType.DATE);
-              case TIME:
-                return Types.repeated(MinorType.TIME);
-              default:
-                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
-            }
-          case BOOLEAN:
-            return Types.repeated(TypeProtos.MinorType.BIT);
-          case FLOAT:
-            return Types.repeated(TypeProtos.MinorType.FLOAT4);
-          case DOUBLE:
-            return Types.repeated(TypeProtos.MinorType.FLOAT8);
-          // Both of these are not supported by the parquet library yet (7/3/13),
-          // but they are declared here for when they are implemented
-          case INT96:
-            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
-                .setMode(mode).build();
-          case FIXED_LEN_BYTE_ARRAY:
-            if (convertedType == null) {
-              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
-              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                      .setWidth(length).setMode(mode).build();
-            } else if (convertedType == ConvertedType.DECIMAL) {
-              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
-            }
-          default:
-            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
-        }
-    }
-    throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode);
-  }
-
-  private static void getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
-                                        SchemaElement schemaElement, List<VarLengthColumn> varLengthColumns,
-                                        List<NullableVarLengthColumn> nullableVarLengthColumns) throws ExecutionSetupException {
-    ConvertedType convertedType = schemaElement.getConverted_type();
-    switch (descriptor.getMaxDefinitionLevel()) {
-      case 0:
-        if (convertedType == null) {
-          varLengthColumns.add(new VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement));
-          return;
-        }
-        switch (convertedType) {
-          case UTF8:
-            varLengthColumns.add(new VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement));
-            return;
-          case DECIMAL:
-            if (v instanceof Decimal28SparseVector) {
-              varLengthColumns.add(new Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement));
-              return;
-            } else if (v instanceof Decimal38SparseVector) {
-              varLengthColumns.add(new Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement));
-              return;
-            }
-          default:
-        }
-      default:
-        if (convertedType == null) {
-          nullableVarLengthColumns.add(new NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement));
-          return;
-        }
-        switch (convertedType) {
-          case UTF8:
-            nullableVarLengthColumns.add(new NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement));
-            return;
-          case DECIMAL:
-            if (v instanceof NullableDecimal28SparseVector) {
-              nullableVarLengthColumns.add(new NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement));
-              return;
-            } else if (v instanceof NullableDecimal38SparseVector) {
-              nullableVarLengthColumns.add(new NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement));
-              return;
-            }
-          default:
-        }
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  private static MinorType getDecimalType(SchemaElement schemaElement) {
-    return schemaElement.getPrecision() <= 28 ? MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
-  }
-
-  static String join(String delimiter, String... str) {
-    StringBuilder builder = new StringBuilder();
-    int i = 0;
-    for (String s : str) {
-      builder.append(s);
-      if (i < str.length) {
-        builder.append(delimiter);
-      }
-      i++;
-    }
-    return builder.toString();
-  }
-
-  @Override
-  public void cleanup() {
-    for (ColumnReader column : columnStatuses) {
-      column.clear();
-    }
-    columnStatuses.clear();
-
-    for (VarLengthColumn r : varLengthReader.columns){
-      r.clear();
-    }
-    for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      r.clear();
-    }
-    varLengthReader.columns.clear();
-    varLengthReader.nullableColumns.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index b26f688..a336316 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -52,6 +52,7 @@ import static java.lang.Math.min;
 import static java.lang.String.format;
 
 public class ParquetRecordWriter extends ParquetOutputRecordWriter {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
 
   private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -147,7 +148,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
       long memSize = store.memSize();
       if (memSize > blockSize) {
-        System.out.println("Reached block size " + blockSize);
+        logger.debug("Reached block size " + blockSize);
         flush();
         newSchema();
         recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index df6581f..b4f02fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.drill.exec.store.RecordReader;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
deleted file mode 100644
index 813a799..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-
-import parquet.bytes.BytesUtils;
-
-public class VarLenBinaryReader {
-
-  ParquetRecordReader parentReader;
-  final List<VarLengthColumn> columns;
-  final List<NullableVarLengthColumn> nullableColumns;
-
-  public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns,
-                            List<NullableVarLengthColumn> nullableColumns){
-    this.parentReader = parentReader;
-    this.nullableColumns = nullableColumns;
-    this.columns = columns;
-  }
-
-  /**
-   * Reads as many variable length values as possible.
-   *
-   * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
-   * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
-   * @return - the number of fixed length fields that will fit in the batch
-   * @throws IOException
-   */
-  public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
-
-    long recordsReadInCurrentPass = 0;
-    int lengthVarFieldsInCurrentRecord;
-    boolean rowGroupFinished = false;
-    byte[] bytes;
-    // write the first 0 offset
-    for (ColumnReader columnReader : columns) {
-      columnReader.bytesReadInCurrentPass = 0;
-      columnReader.valuesReadInCurrentPass = 0;
-    }
-    // same for the nullable columns
-    for (NullableVarLengthColumn columnReader : nullableColumns) {
-      columnReader.bytesReadInCurrentPass = 0;
-      columnReader.valuesReadInCurrentPass = 0;
-      columnReader.nullsRead = 0;
-    }
-    outer: do {
-      lengthVarFieldsInCurrentRecord = 0;
-      for (VarLengthColumn columnReader : columns) {
-        if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
-          rowGroupFinished = true;
-          break;
-        }
-        if (columnReader.pageReadStatus.currentPage == null
-            || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
-          columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
-          if (!columnReader.pageReadStatus.next()) {
-            rowGroupFinished = true;
-            break;
-          }
-        }
-        bytes = columnReader.pageReadStatus.pageDataByteArray;
-
-        // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
-        columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
-            (int) columnReader.pageReadStatus.readPosInBytes);
-        lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
-        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
-          break outer;
-        }
-
-      }
-      for (NullableVarLengthColumn columnReader : nullableColumns) {
-        // check to make sure there is capacity for the next value (for nullables this is a check to see if there is
-        // still space in the nullability recording vector)
-        if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
-          rowGroupFinished = true;
-          break;
-        }
-        if (columnReader.pageReadStatus.currentPage == null
-            || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
-          if (!columnReader.pageReadStatus.next()) {
-            rowGroupFinished = true;
-            break;
-          } else {
-            columnReader.currDictVal = null;
-          }
-        }
-        bytes = columnReader.pageReadStatus.pageDataByteArray;
-        // we need to read all of the lengths to determine if this value will fit in the current vector,
-        // as we can only read each definition level once, we have to store the last one as we will need it
-        // at the start of the next read if we decide after reading all of the varlength values in this record
-        // that it will not fit in this batch
-        if ( columnReader.currDefLevel == -1 ) {
-          columnReader.currDefLevel = columnReader.pageReadStatus.definitionLevels.readInteger();
-        }
-        if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > columnReader.currDefLevel){
-          columnReader.currentValNull = true;
-          columnReader.dataTypeLengthInBits = 0;
-          columnReader.nullsRead++;
-          continue;// field is null, no length to add to data vector
-        }
-
-        if (columnReader.usingDictionary) {
-          if (columnReader.currDictVal == null) {
-            columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes();
-          }
-          // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
-          columnReader.dataTypeLengthInBits = columnReader.currDictVal.length();
-        }
-        else {
-          // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
-          columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
-              (int) columnReader.pageReadStatus.readPosInBytes);
-        }
-        lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
-        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
-          break outer;
-        }
-      }
-      // check that the next record will fit in the batch
-      if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord
-          > parentReader.getBatchSize()){
-        break outer;
-      }
-      for (VarLengthColumn columnReader : columns) {
-        bytes = columnReader.pageReadStatus.pageDataByteArray;
-        // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
-        boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
-            (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
-        assert success;
-        columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
-        columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
-        columnReader.pageReadStatus.valuesRead++;
-        columnReader.valuesReadInCurrentPass++;
-      }
-      for (NullableVarLengthColumn columnReader : nullableColumns) {
-        bytes = columnReader.pageReadStatus.pageDataByteArray;
-        // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
-        if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){
-          boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
-                (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
-          assert success;
-        }
-        columnReader.currentValNull = false;
-        columnReader.currDefLevel = -1;
-        if (columnReader.dataTypeLengthInBits > 0){
-          columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
-          columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
-        }
-        columnReader.pageReadStatus.valuesRead++;
-        columnReader.valuesReadInCurrentPass++;
-        if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
-          columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
-          columnReader.pageReadStatus.next();
-        }
-        columnReader.currDictVal = null;
-      }
-      recordsReadInCurrentPass++;
-    } while (recordsReadInCurrentPass < recordsToReadInThisPass);
-    for (VarLengthColumn columnReader : columns) {
-      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
-    }
-    for (NullableVarLengthColumn columnReader : nullableColumns) {
-      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
-    }
-    return recordsReadInCurrentPass;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
deleted file mode 100644
index 56f687c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store.parquet;
-
-import java.math.BigDecimal;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
-import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.io.api.Binary;
-
-public class VarLengthColumnReaders {
-
-  public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
-
-    Binary currDictVal;
-
-    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
-                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-        usingDictionary = true;
-      }
-      else {
-        usingDictionary = false;
-      }
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
-
-    public abstract int capacity();
-
-  }
-
-  public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader {
-
-    int nullsRead;
-    boolean currentValNull = false;
-    Binary currDictVal;
-
-    NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
-                            SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-    }
-
-    public abstract boolean setSafe(int index, byte[] value, int start, int length);
-
-    public abstract int capacity();
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  public static class Decimal28Column extends VarLengthColumn<Decimal28SparseVector> {
-
-    protected Decimal28SparseVector decimal28Vector;
-
-    Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v,
-                   SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      this.decimal28Vector = v;
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
-      if (index >= decimal28Vector.getValueCapacity()) {
-        return false;
-      }
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
-      return true;
-    }
-
-    @Override
-    public int capacity() {
-      return decimal28Vector.getData().capacity();
-    }
-  }
-
-  public static class NullableDecimal28Column extends NullableVarLengthColumn<NullableDecimal28SparseVector> {
-
-    protected NullableDecimal28SparseVector nullableDecimal28Vector;
-
-    NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v,
-                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      nullableDecimal28Vector = v;
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
-      if (index >= nullableDecimal28Vector.getValueCapacity()) {
-        return false;
-      }
-      DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
-      nullableDecimal28Vector.getMutator().setIndexDefined(index);
-      return true;
-    }
-
-    @Override
-    public int capacity() {
-      return nullableDecimal28Vector.getData().capacity();
-    }
-  }
-
-  public static class Decimal38Column extends VarLengthColumn<Decimal38SparseVector> {
-
-    protected Decimal38SparseVector decimal28Vector;
-
-    Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v,
-                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      decimal28Vector = v;
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
-      if (index >= decimal28Vector.getValueCapacity()) {
-        return false;
-      }
-      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
-      return true;
-    }
-
-    @Override
-    public int capacity() {
-      return decimal28Vector.getData().capacity();
-    }
-  }
-
-  public static class NullableDecimal38Column extends NullableVarLengthColumn<NullableDecimal38SparseVector> {
-
-    protected NullableDecimal38SparseVector nullableDecimal38Vector;
-
-    NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v,
-                            SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      nullableDecimal38Vector = v;
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
-      if (index >= nullableDecimal38Vector.getValueCapacity()) {
-        return false;
-      }
-      DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(),
-              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
-      nullableDecimal38Vector.getMutator().setIndexDefined(index);
-      return true;
-    }
-
-    @Override
-    public int capacity() {
-      return nullableDecimal38Vector.getData().capacity();
-    }
-  }
-
-
-  public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
-
-    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
-    protected VarCharVector varCharVector;
-
-    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
-                  SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      varCharVector = v;
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      boolean success;
-      if(index >= varCharVector.getValueCapacity()) return false;
-
-      if (usingDictionary) {
-        success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
-            0, currDictVal.length());
-      }
-      else {
-        success = varCharVector.getMutator().setSafe(index, bytes, start, length);
-      }
-      return success;
-    }
-
-    @Override
-    public int capacity() {
-      return varCharVector.getData().capacity();
-    }
-  }
-
-  public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> {
-
-    int nullsRead;
-    boolean currentValNull = false;
-    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
-    protected NullableVarCharVector nullableVarCharVector;
-
-    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                          ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
-                          SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      nullableVarCharVector = v;
-    }
-
-    public boolean setSafe(int index, byte[] value, int start, int length) {
-      boolean success;
-      if(index >= nullableVarCharVector.getValueCapacity()) return false;
-
-      if (usingDictionary) {
-        success = nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
-            0, currDictVal.length());
-      }
-      else {
-        success = nullableVarCharVector.getMutator().setSafe(index, value, start, length);
-      }
-      return success;
-    }
-
-    @Override
-    public int capacity() {
-      return nullableVarCharVector.getData().capacity();
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> {
-
-    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
-    protected VarBinaryVector varBinaryVector;
-
-    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
-                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      varBinaryVector = v;
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
-      boolean success;
-      if(index >= varBinaryVector.getValueCapacity()) return false;
-
-      if (usingDictionary) {
-        success = varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
-            0, currDictVal.length());
-      }
-      else {
-        success = varBinaryVector.getMutator().setSafe(index, bytes, start, length);
-      }
-      return success;
-    }
-
-    @Override
-    public int capacity() {
-      return varBinaryVector.getData().capacity();
-    }
-  }
-
-  public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> {
-
-    int nullsRead;
-    boolean currentValNull = false;
-    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
-    protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector;
-
-    NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
-                            SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      nullableVarBinaryVector = v;
-    }
-
-    public boolean setSafe(int index, byte[] value, int start, int length) {
-      boolean success;
-      if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
-
-      if (usingDictionary) {
-        success = nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
-            0, currDictVal.length());
-      }
-      else {
-        success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length);
-      }
-      return  success;
-    }
-
-    @Override
-    public int capacity() {
-      return nullableVarBinaryVector.getData().capacity();
-    }
-
-    @Override
-    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
-      throw new UnsupportedOperationException();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
new file mode 100644
index 0000000..2c6e488
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+final class BitReader extends ColumnReader {
+
+  private byte currentByte;
+  private byte nextByte;
+  private byte[] bytes;
+  
+  BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+            boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+  }
+
+  @Override
+  protected void readField(long recordsToReadInThisPass) {
+
+    recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+    readStartInBytes = pageReader.readPosInBytes;
+    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+    readLength = (int) Math.ceil(readLengthInBits / 8.0);
+
+    bytes = pageReader.pageDataByteArray;
+    // standard read, using memory mapping
+    if (pageReader.bitShift == 0) {
+      ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
+          (int) readStartInBytes, (int) readLength);
+    } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
+
+      vectorData = ((BaseDataValueVector) valueVec).getData();
+      nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)];
+      readLengthInBits = recordsReadInThisIteration + pageReader.bitShift;
+
+      int i = 0;
+      // read individual bytes with appropriate shifting
+      for (; i < (int) readLength; i++) {
+        currentByte = nextByte;
+        currentByte = (byte) (currentByte >>> pageReader.bitShift);
+        // mask the bits about to be added from the next byte
+        currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReader.bitShift - 1]);
+        // if we are not on the last byte
+        if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < pageReader.byteLength) {
+          // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
+          nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i];
+          currentByte = (byte) (currentByte | nextByte
+              << (8 - pageReader.bitShift)
+              & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]);
+        }
+        vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
+      }
+      vectorData.setIndex(0, (valuesReadInCurrentPass / 8)
+          + (int) readLength - 1);
+      vectorData.capacity(vectorData.writerIndex() + 1);
+    }
+
+    // check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
+    // shifted by to read all of the values into the vector without leaving space
+    if (readLengthInBits % 8 != 0) {
+      pageReader.bitShift = (int) readLengthInBits % 8;
+    } else {
+      pageReader.bitShift = 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
new file mode 100644
index 0000000..fd672d6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.IOException;
+
+public abstract class ColumnReader<V extends ValueVector> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+
+  final ParquetRecordReader parentReader;
+  
+  // Value Vector for this column
+  final V valueVec;
+
+  ColumnDescriptor getColumnDescriptor() {
+    return columnDescriptor;
+  }
+
+  // column description from the parquet library
+  final ColumnDescriptor columnDescriptor;
+  // metadata of the column, from the parquet library
+  final ColumnChunkMetaData columnChunkMetaData;
+  // status information on the current page
+  PageReader pageReader;
+
+  final SchemaElement schemaElement;
+  boolean usingDictionary;
+
+  // quick reference to see if the field is fixed length (as this requires an instanceof)
+  final boolean isFixedLength;
+
+  // counter for the total number of values read from one or more pages
+  // when a batch is filled all of these values should be the same for all of the columns
+  int totalValuesRead;
+  
+  // counter for the values that have been read in this pass (a single call to the next() method)
+  int valuesReadInCurrentPass;
+  
+  // length of single data value in bits, if the length is fixed
+  int dataTypeLengthInBits;
+  int bytesReadInCurrentPass;
+
+  protected ByteBuf vectorData;
+  // when reading definition levels for nullable columns, it is a one-way stream of integers
+  // when reading var length data, where we don't know if all of the records will fit until we've read all of them
+  // we must store the last definition level an use it in at the start of the next batch
+  int currDefLevel;
+
+  // variables for a single read pass
+  long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
+
+  protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
+    this.parentReader = parentReader;
+    this.columnDescriptor = descriptor;
+    this.columnChunkMetaData = columnChunkMetaData;
+    this.isFixedLength = fixedLength;
+    this.schemaElement = schemaElement;
+    this.valueVec =  v;
+    this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData);
+
+    if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+      if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+        dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
+      } else {
+        dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
+      }
+    }
+
+  }
+
+  public int getRecordsReadInCurrentPass() {
+    return valuesReadInCurrentPass;
+  }
+
+  public void processPages(long recordsToReadInThisPass) throws IOException {
+    reset();
+    do {
+      determineSize(recordsToReadInThisPass, 0);
+
+    } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+    valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
+  }
+
+  public void clear() {
+    valueVec.clear();
+    this.pageReader.clear();
+  }
+
+  public void readValues(long recordsToRead) {
+    readField(recordsToRead);
+
+    valuesReadInCurrentPass += recordsReadInThisIteration;
+    totalValuesRead += recordsReadInThisIteration;
+    pageReader.valuesRead += recordsReadInThisIteration;
+    pageReader.readPosInBytes = readStartInBytes + readLength;
+  }
+
+  protected abstract void readField(long recordsToRead);
+
+  /**
+   * Determines the size of a single value in a variable column.
+   *
+   * Return value indicates if we have finished a row group and should stop reading
+   *
+   * @param recordsReadInCurrentPass
+   * @param lengthVarFieldsInCurrentRecord
+   * @return - true if we should stop reading
+   * @throws IOException
+   */
+  public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord) throws IOException {
+
+    boolean doneReading = readPage();
+    if (doneReading)
+      return true;
+
+    doneReading = processPageData((int) recordsReadInCurrentPass);
+    if (doneReading)
+      return true;
+
+    lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+
+    doneReading = checkVectorCapacityReached();
+    if (doneReading)
+      return true;
+
+    return false;
+  }
+
+  protected void readRecords(int recordsToRead) {
+    for (int i = 0; i < recordsToRead; i++) {
+      readField(i);
+    }
+    pageReader.valuesRead += recordsToRead;
+  }
+
+  protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
+    readValues(recordsToReadInThisPass);
+    return true;
+  }
+
+  public void updatePosition() {}
+
+  public void updateReadyToReadPosition() {}
+
+  public void reset() {
+    readStartInBytes = 0;
+    readLength = 0;
+    readLengthInBits = 0;
+    recordsReadInThisIteration = 0;
+    bytesReadInCurrentPass = 0;
+    vectorData = ((BaseValueVector) valueVec).getData();
+  }
+
+  public int capacity() {
+    return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0);
+  }
+
+  // Read a page if we need more data, returns true if we need to exit the read loop
+  public boolean readPage() throws IOException {
+    if (pageReader.currentPage == null
+        || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) {
+      readRecords(pageReader.valuesReadyToRead);
+      if (pageReader.currentPage != null)
+        totalValuesRead += pageReader.currentPage.getValueCount();
+      if (!pageReader.next()) {
+        hitRowGroupEnd();
+        return true;
+      }
+      postPageRead();
+    }
+    return false;
+  }
+
+  protected int totalValuesReadAndReadyToReadInPage() {
+    return pageReader.valuesRead + pageReader.valuesReadyToRead;
+  }
+
+  protected void postPageRead() {
+    pageReader.valuesReadyToRead = 0;
+  }
+
+  protected void hitRowGroupEnd() {}
+
+  protected boolean checkVectorCapacityReached() {
+    if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) {
+      logger.debug("Reached the capacity of the data vector in a variable length value vector.");
+      return true;
+    }
+    else if (valuesReadInCurrentPass > valueVec.getValueCapacity()){
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
new file mode 100644
index 0000000..243744e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -0,0 +1,175 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ColumnReaderFactory {
+
+  /**
+   * @param fixedLength
+   * @param descriptor
+   * @param columnChunkMetaData
+   * @param allocateSize - the size of the vector to create
+   * @return
+   * @throws SchemaChangeException
+   */
+  static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
+                                               ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+                                               SchemaElement schemaElement)
+      throws Exception {
+    ConvertedType convertedType = schemaElement.getConverted_type();
+    // if the column is required, or repeated (in which case we just want to use this to generate our appropriate
+    // ColumnReader for actually transferring data into the data vector inside of our repeated vector
+    if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0){
+      if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+        return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+            fixedLength, v, schemaElement);
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+        int length = schemaElement.type_length;
+        if (length <= 12) {
+          return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+        } else if (length <= 16) {
+          return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+        }
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+        return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      } else{
+        if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+          return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+              fixedLength, v, schemaElement);
+        } else {
+          return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+              fixedLength, v, schemaElement);
+        }
+      }
+    }
+    else { // if the column is nullable
+      if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+        return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+            fixedLength, v, schemaElement);
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+        return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+        int length = schemaElement.type_length;
+        if (length <= 12) {
+          return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+        } else if (length <= 16) {
+          return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+        }
+      } else {
+        return getNullableColumnReader(recordReader, allocateSize, descriptor,
+            columnChunkMetaData, fixedLength, v, schemaElement);
+      }
+    }
+    throw new Exception("Unexpected parquet metadata configuration.");
+  }
+
+  static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                          ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
+                                          SchemaElement schemaElement
+  ) throws ExecutionSetupException {
+    ConvertedType convertedType = schemaElement.getConverted_type();
+    switch (descriptor.getMaxDefinitionLevel()) {
+      case 0:
+        if (convertedType == null) {
+          return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+        }
+        switch (convertedType) {
+          case UTF8:
+            return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
+          case DECIMAL:
+            if (v instanceof Decimal28SparseVector) {
+              return new VarLengthColumnReaders.Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement);
+            } else if (v instanceof Decimal38SparseVector) {
+              return new VarLengthColumnReaders.Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement);
+            }
+          default:
+        }
+      default:
+        if (convertedType == null) {
+          return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+        }
+        switch (convertedType) {
+          case UTF8:
+            return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
+          case DECIMAL:
+            if (v instanceof NullableDecimal28SparseVector) {
+              return new VarLengthColumnReaders.NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement);
+            } else if (v instanceof NullableDecimal38SparseVector) {
+              return new VarLengthColumnReaders.NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement);
+            }
+          default:
+        }
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+                                                             ColumnDescriptor columnDescriptor,
+                                                             ColumnChunkMetaData columnChunkMetaData,
+                                                             boolean fixedLength,
+                                                             ValueVector valueVec,
+                                                             SchemaElement schemaElement) throws ExecutionSetupException {
+    if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+      return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+          fixedLength, valueVec, schemaElement);
+    } else {
+      if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
+        return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
+        return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableIntVector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+      }
+      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
+        return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+            fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+      }
+      else{
+        throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
+      }
+    }
+  }
+}


Mime
View raw message