drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [11/17] DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.
Date Tue, 29 Jul 2014 15:38:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
new file mode 100644
index 0000000..7eeeeaa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -0,0 +1,236 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.types.TypeProtos;
+import static org.apache.drill.common.types.TypeProtos.MinorType;
+import static org.apache.drill.common.types.TypeProtos.DataMode;
+import static parquet.Preconditions.checkArgument;
+
+import org.apache.drill.common.types.Types;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.schema.PrimitiveType;
+
+public class ParquetToDrillTypeConverter {
+
+  private static TypeProtos.MinorType getDecimalType(SchemaElement schemaElement) {
+    return schemaElement.getPrecision() <= 28 ? TypeProtos.MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
+  }
+
+  public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
+                                          TypeProtos.DataMode mode, SchemaElement schemaElement) {
+    ConvertedType convertedType = schemaElement.getConverted_type();
+    switch (mode) {
+
+      case OPTIONAL:
+        switch (primitiveTypeName) {
+          case BINARY:
+            if (convertedType == null) {
+              return Types.optional(TypeProtos.MinorType.VARBINARY);
+            }
+            switch (convertedType) {
+              case UTF8:
+                return Types.optional(TypeProtos.MinorType.VARCHAR);
+              case DECIMAL:
+                return Types.withScaleAndPrecision(getDecimalType(schemaElement), TypeProtos.DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT64:
+            if (convertedType == null) {
+              return Types.optional(TypeProtos.MinorType.BIGINT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(TypeProtos.MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+              case FINETIME:
+                throw new UnsupportedOperationException();
+              case TIMESTAMP:
+                return Types.optional(MinorType.TIMESTAMP);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT32:
+            if (convertedType == null) {
+              return Types.optional(TypeProtos.MinorType.INT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+              case DATE:
+                return Types.optional(MinorType.DATE);
+              case TIME:
+                return Types.optional(MinorType.TIME);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case BOOLEAN:
+            return Types.optional(TypeProtos.MinorType.BIT);
+          case FLOAT:
+            return Types.optional(TypeProtos.MinorType.FLOAT4);
+          case DOUBLE:
+            return Types.optional(TypeProtos.MinorType.FLOAT8);
+          // TODO - Both of these are not supported by the parquet library yet (7/3/13),
+          // but they are declared here for when they are implemented
+          case INT96:
+            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+                .setMode(mode).build();
+          case FIXED_LEN_BYTE_ARRAY:
+            if (convertedType == null) {
+              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+                  .setWidth(length).setMode(mode).build();
+            } else if (convertedType == ConvertedType.DECIMAL) {
+              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+            }
+          default:
+            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+        }
+      case REQUIRED:
+        switch (primitiveTypeName) {
+          case BINARY:
+            if (convertedType == null) {
+              return Types.required(TypeProtos.MinorType.VARBINARY);
+            }
+            switch (convertedType) {
+              case UTF8:
+                return Types.required(MinorType.VARCHAR);
+              case DECIMAL:
+                return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT64:
+            if (convertedType == null) {
+              return Types.required(MinorType.BIGINT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+              case FINETIME:
+                throw new UnsupportedOperationException();
+              case TIMESTAMP:
+                return Types.required(MinorType.TIMESTAMP);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT32:
+            if (convertedType == null) {
+              return Types.required(MinorType.INT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+              case DATE:
+                return Types.required(MinorType.DATE);
+              case TIME:
+                return Types.required(MinorType.TIME);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case BOOLEAN:
+            return Types.required(TypeProtos.MinorType.BIT);
+          case FLOAT:
+            return Types.required(TypeProtos.MinorType.FLOAT4);
+          case DOUBLE:
+            return Types.required(TypeProtos.MinorType.FLOAT8);
+          // Both of these are not supported by the parquet library yet (7/3/13),
+          // but they are declared here for when they are implemented
+          case INT96:
+            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+                .setMode(mode).build();
+          case FIXED_LEN_BYTE_ARRAY:
+            if (convertedType == null) {
+              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+                  .setWidth(length).setMode(mode).build();
+            } else if (convertedType == ConvertedType.DECIMAL) {
+              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+            }
+          default:
+            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+        }
+      case REPEATED:
+        switch (primitiveTypeName) {
+          case BINARY:
+            if (convertedType == null) {
+              return Types.repeated(TypeProtos.MinorType.VARBINARY);
+            }
+            switch (schemaElement.getConverted_type()) {
+              case UTF8:
+                return Types.repeated(MinorType.VARCHAR);
+              case DECIMAL:
+                return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT64:
+            if (convertedType == null) {
+              return Types.repeated(MinorType.BIGINT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+              case FINETIME:
+                throw new UnsupportedOperationException();
+              case TIMESTAMP:
+                return Types.repeated(MinorType.TIMESTAMP);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case INT32:
+            if (convertedType == null) {
+              return Types.repeated(MinorType.INT);
+            }
+            switch(convertedType) {
+              case DECIMAL:
+                return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+              case DATE:
+                return Types.repeated(MinorType.DATE);
+              case TIME:
+                return Types.repeated(MinorType.TIME);
+              default:
+                throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+            }
+          case BOOLEAN:
+            return Types.repeated(TypeProtos.MinorType.BIT);
+          case FLOAT:
+            return Types.repeated(TypeProtos.MinorType.FLOAT4);
+          case DOUBLE:
+            return Types.repeated(TypeProtos.MinorType.FLOAT8);
+          // Both of these are not supported by the parquet library yet (7/3/13),
+          // but they are declared here for when they are implemented
+          case INT96:
+            return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+                .setMode(mode).build();
+          case FIXED_LEN_BYTE_ARRAY:
+            if (convertedType == null) {
+              checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+              return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+                  .setWidth(length).setMode(mode).build();
+            } else if (convertedType == ConvertedType.DECIMAL) {
+              return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+            }
+          default:
+            throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+        }
+    }
+    throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
new file mode 100644
index 0000000..409f17d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VarLenBinaryReader {
+
+  ParquetRecordReader parentReader;
+  final List<VarLengthColumn> columns;
+
+  public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns){
+    this.parentReader = parentReader;
+    this.columns = columns;
+  }
+
+  /**
+   * Reads as many variable length values as possible.
+   *
+   * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
+   * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
+   * @return - the number of fixed length fields that will fit in the batch
+   * @throws IOException
+   */
+  public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+
+    long recordsReadInCurrentPass = 0;
+    int lengthVarFieldsInCurrentRecord;
+    long totalVariableLengthData = 0;
+    boolean exitLengthDeterminingLoop = false;
+    // write the first 0 offset
+    for (VarLengthColumn columnReader : columns) {
+      columnReader.reset();
+    }
+
+    do {
+      lengthVarFieldsInCurrentRecord = 0;
+      for (VarLengthColumn columnReader : columns) {
+        if ( ! exitLengthDeterminingLoop )
+          exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
+        else
+          break;
+      }
+      // check that the next record will fit in the batch
+      if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData
+          + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()){
+        break;
+      }
+      for (VarLengthColumn columnReader : columns ) {
+        columnReader.updateReadyToReadPosition();
+        columnReader.currDefLevel = -1;
+      }
+      recordsReadInCurrentPass++;
+      totalVariableLengthData += lengthVarFieldsInCurrentRecord;
+    } while (recordsReadInCurrentPass < recordsToReadInThisPass);
+
+    for (VarLengthColumn columnReader : columns) {
+      columnReader.readRecords(columnReader.pageReader.valuesReadyToRead);
+    }
+    for (VarLengthColumn columnReader : columns) {
+      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
+    }
+    return recordsReadInCurrentPass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
new file mode 100644
index 0000000..14ee631
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.Encoding;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
+
+import java.io.IOException;
+
+public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class);
+
+  Binary currDictVal;
+
+  VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+                  SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+        usingDictionary = true;
+      }
+      else {
+        usingDictionary = false;
+      }
+  }
+
+  protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
+    return readAndStoreValueSizeInformation();
+  }
+
+  public void reset() {
+    super.reset();
+    pageReader.valuesReadyToRead = 0;
+  }
+
+  protected abstract boolean readAndStoreValueSizeInformation() throws IOException;
+
+  public abstract boolean skipReadyToReadPositionUpdate();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
new file mode 100644
index 0000000..979e8c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -0,0 +1,294 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.math.BigDecimal;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+public class VarLengthColumnReaders {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class);
+
+  public static class Decimal28Column extends VarLengthValuesColumn<Decimal28SparseVector> {
+
+    protected Decimal28SparseVector decimal28Vector;
+
+    Decimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal28SparseVector v,
+                   SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      this.decimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
+      if (index >= decimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return decimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class NullableDecimal28Column extends NullableVarLengthValuesColumn<NullableDecimal28SparseVector> {
+
+    protected NullableDecimal28SparseVector nullableDecimal28Vector;
+
+    NullableDecimal28Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal28SparseVector v,
+                    SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      nullableDecimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal28SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
+      if (index >= nullableDecimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
+      nullableDecimal28Vector.getMutator().setIndexDefined(index);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableDecimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class Decimal38Column extends VarLengthValuesColumn<Decimal38SparseVector> {
+
+    protected Decimal38SparseVector decimal28Vector;
+
+    Decimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal38SparseVector v,
+                    SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      decimal28Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
+      if (index >= decimal28Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return decimal28Vector.getData().capacity();
+    }
+  }
+
+  public static class NullableDecimal38Column extends NullableVarLengthValuesColumn<NullableDecimal38SparseVector> {
+
+    protected NullableDecimal38SparseVector nullableDecimal38Vector;
+
+    NullableDecimal38Column(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal38SparseVector v,
+                            SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      nullableDecimal38Vector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      int width = Decimal38SparseHolder.WIDTH;
+      BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale());
+      if (index >= nullableDecimal38Vector.getValueCapacity()) {
+        return false;
+      }
+      DecimalUtility.getSparseFromBigDecimal(intermediate, nullableDecimal38Vector.getData(), index * width, schemaElement.getScale(),
+              schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
+      nullableDecimal38Vector.getMutator().setIndexDefined(index);
+      return true;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableDecimal38Vector.getData().capacity();
+    }
+  }
+
+
+  public static class VarCharColumn extends VarLengthValuesColumn<VarCharVector> {
+
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected VarCharVector varCharVector;
+
+    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
+                  SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      varCharVector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      boolean success;
+      if(index >= varCharVector.getValueCapacity()) return false;
+
+      if (usingDictionary) {
+        success = varCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+            0, currDictValToWrite.length());
+      }
+      else {
+        success = varCharVector.getMutator().setSafe(index, bytes, start, length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return varCharVector.getData().capacity();
+    }
+  }
+
+  public static class NullableVarCharColumn extends NullableVarLengthValuesColumn<NullableVarCharVector> {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected NullableVarCharVector nullableVarCharVector;
+
+    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                          ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
+                          SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      nullableVarCharVector = v;
+    }
+
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      boolean success;
+      if(index >= nullableVarCharVector.getValueCapacity()) return false;
+
+      if (usingDictionary) {
+        success = nullableVarCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+            0, currDictValToWrite.length());
+      }
+      else {
+        success = nullableVarCharVector.getMutator().setSafe(index, value, start, length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarCharVector.getData().capacity();
+    }
+  }
+
+  public static class VarBinaryColumn extends VarLengthValuesColumn<VarBinaryVector> {
+
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected VarBinaryVector varBinaryVector;
+
+    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
+                    SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      varBinaryVector = v;
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      boolean success;
+      if(index >= varBinaryVector.getValueCapacity()) return false;
+
+      if (usingDictionary) {
+        success = varBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+            0, currDictValToWrite.length());
+      }
+      else {
+        success = varBinaryVector.getMutator().setSafe(index, bytes, start, length);
+      }
+      return success;
+    }
+
+    @Override
+    public int capacity() {
+      return varBinaryVector.getData().capacity();
+    }
+  }
+
+  public static class NullableVarBinaryColumn extends NullableVarLengthValuesColumn<NullableVarBinaryVector> {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected org.apache.drill.exec.vector.NullableVarBinaryVector nullableVarBinaryVector;
+
+    NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
+                            SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      nullableVarBinaryVector = v;
+    }
+
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      boolean success;
+      if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
+
+      if (usingDictionary) {
+        success = nullableVarBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+            0, currDictValToWrite.length());
+      }
+      else {
+        success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length);
+      }
+      return  success;
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarBinaryVector.getData().capacity();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
new file mode 100644
index 0000000..092c186
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.Encoding;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
+
+import java.io.IOException;
+
+public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLengthColumn {
+
+  Binary currLengthDeterminingDictVal;
+  Binary currDictValToWrite;
+  VariableWidthVector variableWidthVector;
+
+  VarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+                        SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    variableWidthVector = (VariableWidthVector) valueVec;
+    if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+      usingDictionary = true;
+    }
+    else {
+      usingDictionary = false;
+    }
+  }
+
+  public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
+
+  @Override
+  protected void readField(long recordToRead) {
+    dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
+    // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+    boolean success = setSafe((int) valuesReadInCurrentPass, pageReader.pageDataByteArray,
+        (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
+    assert success;
+    updatePosition();
+  }
+
+  public void updateReadyToReadPosition() {
+    pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
+    pageReader.valuesReadyToRead++;
+    currLengthDeterminingDictVal = null;
+  }
+
+  public void updatePosition() {
+    pageReader.readPosInBytes += dataTypeLengthInBits + 4;
+    bytesReadInCurrentPass += dataTypeLengthInBits;
+    valuesReadInCurrentPass++;
+  }
+
+  public boolean skipReadyToReadPositionUpdate() {
+    return false;
+  }
+
+  protected boolean readAndStoreValueSizeInformation() throws IOException {
+    // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+    try {
+    dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+        (int) pageReader.readyToReadPosInBytes);
+    } catch (Throwable t) {
+      throw t;
+    }
+
+    // this should not fail
+    if (!variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead,
+        dataTypeLengthInBits)) {
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
index 9b0a6cd..6d03541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
@@ -36,4 +36,17 @@ public interface RepeatedFixedWidthVector extends ValueVector{
    * @return The number of bytes of the buffer that were consumed.
    */
   public int load(int parentValueCount, int childValueCount, ByteBuf buf);
+
+  public abstract RepeatedAccessor getAccessor();
+
+  public abstract RepeatedMutator getMutator();
+
+  public interface RepeatedAccessor extends Accessor {
+    public int getGroupCount();
+  }
+  public interface RepeatedMutator extends Mutator {
+    public void setValueCounts(int parentValueCount, int childValueCount);
+    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount);
+    public BaseDataValueVector getDataVector();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
index bd03038..a2c884e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
@@ -35,6 +35,8 @@ public interface RepeatedVariableWidthVector extends ValueVector{
    */
   public int getByteCapacity();
 
+  public abstract RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
+
   /**
    * Load the records in the provided buffer based on the given number of values.
    * @param dataBytes   The number of bytes associated with the data array.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index 2b07750..6660351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -44,5 +44,15 @@ public interface VariableWidthVector extends ValueVector{
    */
   public int load(int dataBytes, int valueCount, ByteBuf buf);
   
-  public abstract Mutator getMutator();
+  public abstract VariableWidthMutator getMutator();
+
+  public abstract VariableWidthAccessor getAccessor();
+
+  public interface VariableWidthAccessor extends Accessor {
+    public int getValueLength(int index);
+  }
+
+  public interface VariableWidthMutator extends Mutator {
+    public boolean setValueLengthSafe(int index, int length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index ef8aef8..d43bf59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -54,7 +55,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   private final UInt4Vector offsets;   // offsets to start of each record
   private final BufferAllocator allocator;
   private final Mutator mutator = new Mutator();
-  private final Accessor accessor = new Accessor();
+  private final RepeatedListAccessor accessor = new RepeatedListAccessor();
   private ValueVector vector;
   private final MaterializedField field;
   private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
@@ -112,7 +113,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
 
   }
 
-  public class Mutator implements ValueVector.Mutator{
+  public class Mutator implements ValueVector.Mutator, RepeatedMutator{
 
     public void startNewGroup(int index) {
       offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
@@ -151,9 +152,24 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     public void generateTestData(int values) {
     }
 
+    @Override
+    public void setValueCounts(int parentValueCount, int childValueCount) {
+      // TODO - determine if this should be implemented for this class
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
+      return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public BaseDataValueVector getDataVector() {
+      return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
   }
 
-  public class Accessor implements ValueVector.Accessor {
+  public class RepeatedListAccessor implements RepeatedAccessor{
 
     @Override
     public Object getObject(int index) {
@@ -211,6 +227,10 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
       return reader;
     }
 
+    @Override
+    public int getGroupCount() {
+      return size();
+    }
   }
 
   @Override
@@ -315,7 +335,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   @Override
-  public Accessor getAccessor() {
+  public RepeatedListAccessor getAccessor() {
     return accessor;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index f05ab1b..30f5fc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -50,6 +51,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector {
+
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
 
   public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
@@ -59,7 +61,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
   private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
   private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
-  private final Accessor accessor = new Accessor();
+  private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
   private final Mutator mutator = new Mutator();
   private final BufferAllocator allocator;
   private final MaterializedField field;
@@ -278,7 +280,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   }
 
   @Override
-  public Accessor getAccessor() {
+  public RepeatedMapAccessor getAccessor() {
     return accessor;
   }
 
@@ -349,7 +351,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     return mutator;
   }
 
-  public class Accessor implements ValueVector.Accessor{
+  public class RepeatedMapAccessor implements RepeatedAccessor {
 
     @Override
     public Object getObject(int index) {
@@ -414,6 +416,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
       return reader;
     }
 
+    @Override
+    public int getGroupCount() {
+      return size();
+    }
   }
 
   private void populateEmpties(int groupCount){
@@ -424,7 +430,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     lastSet = groupCount - 1;
   }
 
-  public class Mutator implements ValueVector.Mutator{
+  public class Mutator implements ValueVector.Mutator, RepeatedMutator {
 
     public void startNewGroup(int index) {
       populateEmpties(index);
@@ -458,6 +464,21 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     public void generateTestData(int values) {
     }
 
+    @Override
+    public void setValueCounts(int parentValueCount, int childValueCount) {
+      // TODO - determine if this should be implemented for this class
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
+      return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public BaseDataValueVector getDataVector() {
+      return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index aa2b66f..1cb0d06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -39,14 +40,13 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore
+import java.io.UnsupportedEncodingException;
+
 public class TestParquetWriter extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetWriter.class);
 
   static FileSystem fs;
 
-  private static final String EMPLOYEE_PARQUET_PATH = "employee_parquet";
-
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
@@ -59,7 +59,7 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testSimple() throws Exception {
     String selection = "*";
     String inputTable = "cp.`employee.json`";
-    runTestAndValidate(selection, selection, inputTable, EMPLOYEE_PARQUET_PATH);
+    runTestAndValidate(selection, selection, inputTable, "employee_parquet");
   }
 
   @Test
@@ -90,24 +90,13 @@ public class TestParquetWriter extends BaseTestQuery {
   @Test
   public void testTPCHReadWrite1_date_convertedType() throws Exception {
     String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
-        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
     String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
-        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,L_COMMITDATE ,L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
     String inputTable = "cp.`tpch/lineitem.parquet`";
     runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet");
   }
 
-  // TODO file a JIRA for running this query with the projected column names the same as the originals, it failed with a deadbuf
-  // on the client, it appeared that the projection was sending batches out with a record count but a deadbuf
-  /*
-  String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
-      "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
-  String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
-      "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
-      */
-  // this is rather odd, I can select the data out fo parquet and project it to cast the date fields
-  // this stores all of the data correctly, but when I got to read it out again with the query that created it (with redudant casts I beleive) it has
-  // everything but the cast date columns as nulls
   @Test
   public void testTPCHReadWrite2() throws Exception {
     String inputTable = "cp.`tpch/customer.parquet`";
@@ -144,34 +133,65 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate("*", "*", inputTable, "region_parquet");
   }
 
-  // This test fails an asset in OperatorStats intermittently
   @Test
   public void testTPCHReadWrite8() throws Exception {
     String inputTable = "cp.`tpch/supplier.parquet`";
     runTestAndValidate("*", "*", inputTable, "supplier_parquet");
   }
 
+  // working to create an exhaustive test of the format for this one. including all convertedTypes
+  // will not be supporting interval for Beta as of current schedule
+  // Types left out:
+  // "TIMESTAMPTZ_col"
+  @Test
+  public void testRepeated() throws Exception {
+    String inputTable = "cp.`parquet/basic_repeated.json`";
+    runTestAndValidate("*", "*", inputTable, "basic_repeated");
+  }
+
+  // TODO - this is failing due to the parquet behavior of allowing repeated values to reach across
+  // pages. This broke our reading model a bit, but it is possible to work around.
+  @Test
+  public void testRepeatedDouble() throws Exception {
+    String inputTable = "cp.`parquet/repeated_double_data.json`";
+    runTestAndValidate("*", "*", inputTable, "repeated_double_parquet");
+  }
+
+  @Test
+  public void testRepeatedLong() throws Exception {
+    String inputTable = "cp.`parquet/repeated_integer_data.json`";
+    runTestAndValidate("*", "*", inputTable, "repeated_int_parquet");
+  }
+
+  @Test
+  public void testRepeatedBool() throws Exception {
+    String inputTable = "cp.`parquet/repeated_bool_data.json`";
+    runTestAndValidate("*", "*", inputTable, "repeated_bool_parquet");
+  }
+
+  @Test
+  public void testNullReadWrite() throws Exception {
+    String inputTable = "cp.`parquet/null_test_data.json`";
+    runTestAndValidate("*", "*", inputTable, "nullable_test");
+  }
+
+  @Ignore // fails intermittenly when being run with other tests, a patch in DRILL
   @Test
-  @Ignore
   public void testDecimal() throws Exception {
     String selection = "cast(salary as decimal(8,2)) as decimal8, cast(salary as decimal(15,2)) as decimal15, " +
         "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38";
     String validateSelection = "decimal8, decimal15, decimal24, decimal38";
     String inputTable = "cp.`employee.json`";
-    runTestAndValidate(selection, validateSelection, inputTable, EMPLOYEE_PARQUET_PATH);
+    runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal");
   }
 
-  // TODO - ask jacques about OperatorStats
-  // this is also experiencing the same failure as the 8th tpch dataset test above when run with the rest of the tests
-  // in this class all at once, not sure if this is IDE related for resorce management or something that should be looked
-  // at.
   @Test
   public void testMulipleRowGroups() throws Exception {
     try {
       //test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 1*1024*1024));
-      String selection = "*";
+      String selection = "mi";
       String inputTable = "cp.`customer.json`";
-      runTestAndValidate(selection, selection, inputTable, EMPLOYEE_PARQUET_PATH);
+      runTestAndValidate(selection, selection, inputTable, "foodmart_customer_parquet");
     } finally {
       test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024));
     }
@@ -183,7 +203,7 @@ public class TestParquetWriter extends BaseTestQuery {
     String selection = "cast(hire_date as DATE) as hire_date";
     String validateSelection = "hire_date";
     String inputTable = "cp.`employee.json`";
-    runTestAndValidate(selection, validateSelection, inputTable, EMPLOYEE_PARQUET_PATH);
+    runTestAndValidate(selection, validateSelection, inputTable, "foodmart_employee_parquet");
   }
 
   public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception {
@@ -219,12 +239,7 @@ public class TestParquetWriter extends BaseTestQuery {
       for (int i = 0; i < loader.getRecordCount(); i++) {
         HashMap<String, Object> record = new HashMap<>();
         for (VectorWrapper w : loader) {
-          Object obj = null;
-          try {
-            obj = w.getValueVector().getAccessor().getObject(i);
-          } catch (Exception ex) {
-            throw ex;
-          }
+          Object obj = w.getValueVector().getAccessor().getObject(i);
           if (obj != null) {
             if (obj instanceof Text) {
               obj = obj.toString();
@@ -235,6 +250,7 @@ public class TestParquetWriter extends BaseTestQuery {
             else if (obj instanceof byte[]) {
               obj = new String((byte[]) obj, "UTF-8");
             }
+            record.put(w.getField().toExpr(), obj);
           }
           record.put(w.getField().toExpr(), obj);
         }
@@ -252,13 +268,15 @@ public class TestParquetWriter extends BaseTestQuery {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
     addToMaterializedResults(expectedRecords, expected, loader, schema);
     addToMaterializedResults(actualRecords, result, loader, schema);
-    Assert.assertEquals("Different number of objects returned", expectedRecords.size(), actualRecords.size());
+    Assert.assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size());
 
     String missing = "";
     int i = 0;
+    int counter = 0;
     int missmatch;
     for (Map<String, Object> record : expectedRecords) {
       missmatch = 0;
+      counter++;
       for (String column : record.keySet()) {
         if (  actualRecords.get(i).get(column) == null && expectedRecords.get(i).get(column) == null ) {
           continue;
@@ -267,7 +285,7 @@ public class TestParquetWriter extends BaseTestQuery {
           continue;
         if ( (actualRecords.get(i).get(column) == null && record.get(column) == null) || ! actualRecords.get(i).get(column).equals(record.get(column))) {
           missmatch++;
-          System.out.println( i + " " + column + "[ex: " + record.get(column) + ", actual:" + actualRecords.get(i).get(column) + "]");
+          System.out.println( counter + " " + column + "[ex: " + record.get(column) + ", actual:" + actualRecords.get(i).get(column) + "]");
         }
       }
       if ( ! actualRecords.remove(record)) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 3e679bb..2193233 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import static org.apache.drill.exec.store.parquet.TestFileGenerator.intVals;
 import static org.apache.drill.exec.store.parquet.TestFileGenerator.populateFieldInfoMap;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -46,16 +45,17 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.proto.BitControl;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.CachedSingleFileSystem;
 import org.apache.drill.exec.store.TestOutputMutator;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -127,7 +127,15 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     readEntries = "\"/tmp/lineitem_null_dict.parquet\"";
 
     String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
-    testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false);
+    //testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false);
+
+    testFull(QueryType.SQL, "select L_RECEIPTDATE from dfs.`/tmp/lineitem_null_dict.parquet`", "", 1, 1, 100000, false);
+  }
+
+  @Ignore
+  @Test
+  public void testDictionaryError_419() throws Exception {
+    testFull(QueryType.SQL, "select c_address from dfs.`/tmp/customer_snappyimpala_drill_419.parquet`", "", 1, 1, 150000, false);
   }
 
   @Test
@@ -265,7 +273,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   @Test
   public void testMultipleRowGroups() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(3, 3000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields);
     populateFieldInfoMap(props);
     testParquetFullEngineEventBased(true, "/parquet/parquet_scan_screen.json", "/tmp/test.parquet", 1, props);
   }
@@ -277,7 +285,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   @Test
   public void testNullableColumns() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields);
     Object[] boolVals = {true, null, null};
     props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props));
     testParquetFullEngineEventBased(false, "/parquet/parquet_nullable.json", "/tmp/nullable_test.parquet", 1, props);
@@ -289,23 +297,38 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
    * Tests the reading of nullable var length columns, runs the tests twice, once on a file that has
    * a converted type of UTF-8 to make sure it can be read
    */
-  public void testNullableColumnsVarLen() throws Exception {
+public void testNullableColumnsVarLen() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields);
     byte[] val = {'b'};
     byte[] val2 = {'b', '2'};
     byte[] val3 = {'b', '3'};
     byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
-    Object[] boolVals = { val, val2, val4};
-    props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props));
+    Object[] byteArrayVals = { val, val2, val4};
+    props.fields.put("a", new FieldInfo("boolean", "a", 1, byteArrayVals, TypeProtos.MinorType.BIT, props));
     testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props);
-    fields.clear();
+    HashMap<String, FieldInfo> fields2 = new HashMap<>();
     // pass strings instead of byte arrays
-    Object[] boolVals2 = { new org.apache.hadoop.io.Text("b"), new org.apache.hadoop.io.Text("b2"),
+    Object[] textVals = { new org.apache.hadoop.io.Text("b"), new org.apache.hadoop.io.Text("b2"),
         new org.apache.hadoop.io.Text("b3")};
-    props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props));
+    ParquetTestProperties props2 = new ParquetTestProperties(1, 30000, DEFAULT_BYTES_PER_PAGE, fields2);
+    props2.fields.put("a", new FieldInfo("boolean", "a", 1, textVals, TypeProtos.MinorType.BIT, props2));
+    testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json",
+        "\"/tmp/varLen.parquet/a\"", "unused", 1, props2);
+
+  }
+
+  @Ignore
+  @Test
+  public void testFileWithNulls() throws Exception {
+    HashMap<String, FieldInfo> fields3 = new HashMap<>();
+    ParquetTestProperties props3 = new ParquetTestProperties(1, 3000, DEFAULT_BYTES_PER_PAGE, fields3);
+    // actually include null values
+    Object[] valuesWithNull = {new Text(""), new Text("longer string"), null};
+    props3.fields.put("a", new FieldInfo("boolean", "a", 1, valuesWithNull, TypeProtos.MinorType.BIT, props3));
     testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json",
-        "\"/tmp/varLen.parquet/a\"", "unused", 1, props);
+        "\"/tmp/nullable_with_nulls.parquet\"", "unused", 1, props3);
+
   }
 
   @Ignore
@@ -319,7 +342,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     props.fields.put("n_regionkey", null);
     props.fields.put("n_comment", null);
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json",
-        "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, true);
+        "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, QueryType.LOGICAL);
 
     fields = new HashMap<>();
     props = new ParquetTestProperties(1, 5, DEFAULT_BYTES_PER_PAGE, fields);
@@ -332,7 +355,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     props.fields.put("height", null);
     props.fields.put("hair_thickness", null);
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json",
-        "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, true);
+        "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, QueryType.LOGICAL);
   }
 
   @Test
@@ -352,25 +375,36 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
         "/tmp/test.parquet", i, props);
   }
 
+  @Ignore
   @Test
   public void testReadError_Drill_901() throws Exception {
     // select cast( L_COMMENT as varchar) from  dfs_test.`/tmp/drilltest/employee_parquet`
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(1, 120350, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 60175, DEFAULT_BYTES_PER_PAGE, fields);
     testParquetFullEngineEventBased(false, false, "/parquet/par_writer_test.json", null,
-        "unused, no file is generated", 1, props, false);
+        "unused, no file is generated", 1, props, QueryType.PHYSICAL);
   }
 
+  @Ignore
+  @Test
+  public void testReadError_Drill_839() throws Exception {
+    // select cast( L_COMMENT as varchar) from  dfs.`/tmp/drilltest/employee_parquet`
+    HashMap<String, FieldInfo> fields = new HashMap<>();
+    ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields);
+    String readEntries = "\"/tmp/customer_nonull.parquet\"";
+    testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
+        "unused, no file is generated", 1, props, QueryType.LOGICAL);
+  }
 
   @Ignore
   @Test
   public void testReadBug_Drill_418() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 150000, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populateDrill_418_fields(props);
     String readEntries = "\"/tmp/customer.plain.parquet\"";
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "unused, no file is generated", 1, props, true);
+        "unused, no file is generated", 1, props, QueryType.LOGICAL);
   }
 
   // requires binary file generated by pig from TPCH data, also have to disable assert where data is coming in
@@ -378,35 +412,35 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   @Test
   public void testMultipleRowGroupsAndReadsPigError() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 1500000, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populatePigTPCHCustomerFields(props);
     String readEntries = "\"/tmp/tpc-h/customer\"";
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "unused, no file is generated", 1, props, true);
+        "unused, no file is generated", 1, props, QueryType.LOGICAL);
 
     fields = new HashMap();
-    props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields);
+    props = new ParquetTestProperties(1, 100000, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populatePigTPCHSupplierFields(props);
     readEntries = "\"/tmp/tpc-h/supplier\"";
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "unused, no file is generated", 1, props, true);
+        "unused, no file is generated", 1, props, QueryType.LOGICAL);
   }
 
   @Ignore
   @Test
   public void drill_958bugTest() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(5, 300000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(1, 2880404, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populatePigTPCHCustomerFields(props);
     String readEntries = "\"/tmp/store_sales\"";
     testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "unused, no file is generated", 1, props, true);
+        "unused, no file is generated", 1, props, QueryType.LOGICAL);
   }
 
   @Test
   public void testMultipleRowGroupsEvent() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
+    ParquetTestProperties props = new ParquetTestProperties(2, 300, DEFAULT_BYTES_PER_PAGE, fields);
     populateFieldInfoMap(props);
     testParquetFullEngineEventBased(true, "/parquet/parquet_scan_screen.json", "/tmp/test.parquet", 1, props);
   }
@@ -434,7 +468,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     props.fields.put("bigInt", new FieldInfo("int64", "bigInt", 64, TestFileGenerator.longVals, TypeProtos.MinorType.BIGINT, props));
     props.fields.put("bin", new FieldInfo("binary", "bin", -1, TestFileGenerator.binVals, TypeProtos.MinorType.VARBINARY, props));
     props.fields.put("bin2", new FieldInfo("binary", "bin2", -1, TestFileGenerator.bin2Vals, TypeProtos.MinorType.VARBINARY, props));
-    testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, false);
+    testParquetFullEngineEventBased(true, false, "/parquet/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, QueryType.PHYSICAL);
   }
 
   public static void main(String[] args) throws Exception{
@@ -502,19 +536,19 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   public void testParquetFullEngineEventBased(boolean generateNew, String plan, String readEntries, String filename,
                                               int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{
     testParquetFullEngineEventBased(true, generateNew, plan, readEntries,filename,
-                                              numberOfTimesRead /* specified in json plan */, props, true);
+                                              numberOfTimesRead /* specified in json plan */, props, QueryType.LOGICAL);
   }
 
 
   // specific tests should call this method, but it is not marked as a test itself intentionally
   public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{
-    testParquetFullEngineEventBased(true, generateNew, plan, null, filename, numberOfTimesRead, props, true);
+    testParquetFullEngineEventBased(true, generateNew, plan, null, filename, numberOfTimesRead, props, QueryType.LOGICAL);
   }
 
   // specific tests should call this method, but it is not marked as a test itself intentionally
   public void testParquetFullEngineEventBased(boolean testValues, boolean generateNew, String plan, String readEntries, String filename,
                                               int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props,
-                                              boolean runAsLogicalPlan) throws Exception{
+                                              QueryType queryType) throws Exception{
     if (generateNew) TestFileGenerator.generateParquetFile(filename, props);
 
     ParquetResultListener resultListener = new ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues);
@@ -524,11 +558,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     if (readEntries != null) {
       planText = planText.replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
     }
-    if (runAsLogicalPlan){
-      this.testWithListener(QueryType.LOGICAL, planText, resultListener);
-    }else{
-      this.testWithListener(QueryType.PHYSICAL, planText, resultListener);
-    }
+    this.testWithListener(queryType, planText, resultListener);
     resultListener.getResults();
     long D = System.nanoTime();
     System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 4a0efc9..a624234 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet;
 
 import static junit.framework.Assert.assertEquals;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.HashMap;
 
@@ -32,6 +33,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Strings;
@@ -60,7 +62,7 @@ public class ParquetResultListener implements UserResultsListener {
 
   @Override
   public void submissionFailed(RpcException ex) {
-    logger.debug("Submission failed.", ex);
+    logger.error("Submission failed.", ex);
     future.setException(ex);
   }
 
@@ -76,7 +78,9 @@ public class ParquetResultListener implements UserResultsListener {
       return;
     }
 
-    T val = (T) valueVector.getAccessor().getObject(index);
+    T val;
+    try {
+    val = (T) valueVector.getAccessor().getObject(index);
     if (val instanceof byte[]) {
       assert(Arrays.equals((byte[]) value, (byte[]) val));
     }
@@ -85,6 +89,9 @@ public class ParquetResultListener implements UserResultsListener {
     } else {
       assertEquals(value, val);
     }
+    } catch (Throwable ex) {
+      throw ex;
+    }
   }
 
   @Override
@@ -126,7 +133,15 @@ public class ParquetResultListener implements UserResultsListener {
       }
       for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
         if (ParquetRecordReaderTest.VERBOSE_DEBUG){
-          System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " ");
+          Object o = vv.getAccessor().getObject(j);
+          if (o instanceof byte[]) {
+            try {
+              o = new String((byte[])o, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
           System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
         }
         if (testValues){
@@ -164,7 +179,27 @@ public class ParquetResultListener implements UserResultsListener {
 
         for (VectorWrapper vw : batchLoader) {
           ValueVector v = vw.getValueVector();
-          System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " ");
+          Object o = v.getAccessor().getObject(i);
+          if (o instanceof byte[]) {
+            try {
+              // TODO - in the dictionary read error test there is some data that does not look correct
+              // the output of our reader matches the values of the parquet-mr cat/head tools (no full comparison was made,
+              // but from a quick check of a few values it looked consistent
+              // this might have gotten corrupted by pig somehow, or maybe this is just how the data is supposed ot look
+              // TODO - check this!!
+//              for (int k = 0; k < ((byte[])o).length; k++ ) {
+//                // check that the value at each position is a valid single character ascii value.
+//
+//                if (((byte[])o)[k] > 128) {
+//                  System.out.println("batch: " + batchCounter + " record: " + recordCount);
+//                }
+//              }
+              o = new String((byte[])o, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
         }
         System.out.println();
       }
@@ -174,7 +209,7 @@ public class ParquetResultListener implements UserResultsListener {
     if(result.getHeader().getIsLastChunk()){
       // ensure the right number of columns was returned, especially important to ensure selective column read is working
       if (testValues) {
-        assertEquals( "Unexpected number of output columns from parquet scan.", valuesChecked.keySet().size(), props.fields.keySet().size() );
+        assertEquals( "Unexpected number of output columns from parquet scan.", props.fields.keySet().size(), valuesChecked.keySet().size() );
       }
       for (String s : valuesChecked.keySet()) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index 2d2a2ec..3c0287d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.RLE;
 
 import java.util.HashMap;
 
@@ -29,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 
 import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
+import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import parquet.hadoop.ParquetFileWriter;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.MessageType;
@@ -37,8 +39,6 @@ import parquet.schema.MessageTypeParser;
 public class TestFileGenerator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class);
 
-  
-
   // 10 mb per page
   static int bytesPerPage = 1024 * 1024 * 1;
   // { 00000001, 00000010, 00000100, 00001000, 00010000, ... }
@@ -57,6 +57,9 @@ public class TestFileGenerator {
   static final Object[] binVals = { varLen1, varLen2, varLen3 };
   static final Object[] bin2Vals = { varLen3, varLen2, varLen1 };
 
+  // TODO - figure out what this should be set at, it should be based on the max nesting level
+  public static final int MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS = 16;
+
   static void populateDrill_418_fields(ParquetTestProperties props){
 
     props.fields.put("cust_key", new FieldInfo("int32", "integer", 32, intVals, TypeProtos.MinorType.INT, props));
@@ -102,6 +105,34 @@ public class TestFileGenerator {
     props.fields.put("S_COMMENT", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY, props));
   }
 
+  private static abstract class ValueProducer {
+
+    public abstract void reset();
+    public abstract Object getValue();
+  }
+
+  private static class ValueRepeaterProducer extends ValueProducer {
+
+    WrapAroundCounter position;
+    Object[] values;
+
+    public ValueRepeaterProducer(Object[] values) {
+      this.values = values;
+      position = new WrapAroundCounter(values.length);
+    }
+
+    @Override
+    public void reset() {
+      position.reset();
+    }
+
+    public Object getValue() {
+      Object ret = values[position.val];
+      position.increment();
+      return ret;
+    }
+  }
+
   public static void generateParquetFile(String filename, ParquetTestProperties props) throws Exception {
 
     int currentBooleanByte = 0;
@@ -133,7 +164,7 @@ public class TestFileGenerator {
     HashMap<String, Integer> columnValuesWritten = new HashMap();
     int valsWritten;
     for (int k = 0; k < props.numberRowGroups; k++){
-      w.startBlock(1);
+      w.startBlock(props.recordsPerRowGroup);
       currentBooleanByte = 0;
       booleanBitCounter.reset();
 
@@ -152,6 +183,8 @@ public class TestFileGenerator {
         w.startColumn(c1, props.recordsPerRowGroup, codec);
         int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages);
         byte[] bytes;
+        RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage);
+        RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage);
         // for variable length binary fields
         int bytesNeededToEncodeLength = 4;
         if ((int) fieldInfo.bitLength > 0) {
@@ -169,6 +202,8 @@ public class TestFileGenerator {
         int bytesWritten = 0;
         for (int z = 0; z < (int) fieldInfo.numberOfPages; z++, bytesWritten = 0) {
           for (int i = 0; i < valsPerPage; i++) {
+            repLevels.writeInteger(0);
+            defLevels.writeInteger(1);
             //System.out.print(i + ", " + (i % 25 == 0 ? "\n gen " + fieldInfo.name + ": " : ""));
             if (fieldInfo.values[0] instanceof Boolean) {
 
@@ -195,7 +230,13 @@ public class TestFileGenerator {
             }
 
           }
-          w.writeDataPage((int) (props.recordsPerRowGroup / (int) fieldInfo.numberOfPages), bytes.length, BytesInput.from(bytes), PLAIN, PLAIN, PLAIN);
+          byte[] fullPage = new byte[2 * 4 * valsPerPage + bytes.length];
+          byte[] repLevelBytes = repLevels.getBytes().toByteArray();
+          byte[] defLevelBytes = defLevels.getBytes().toByteArray();
+          System.arraycopy(bytes, 0, fullPage, 0, bytes.length);
+          System.arraycopy(repLevelBytes, 0, fullPage, bytes.length, repLevelBytes.length);
+          System.arraycopy(defLevelBytes, 0, fullPage, bytes.length + repLevelBytes.length, defLevelBytes.length);
+          w.writeDataPage( (props.recordsPerRowGroup / fieldInfo.numberOfPages), fullPage.length, BytesInput.from(fullPage), RLE, RLE, PLAIN);
           currentBooleanByte = 0;
         }
         w.endColumn();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json b/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json
new file mode 100644
index 0000000..927cb52
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/alltypes_repeated.json
@@ -0,0 +1,28 @@
+{
+    "TINYINT_col" :              [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 10000, -10000 ],
+    "UINT1_col" :                [ 1, 2, 3, 4, 5, 10000 ],
+    "UINT2_col" :                [ 1, 2, 3, 4, 5, 10000 ],
+    SMALLINT_col" :              [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 100000, -100000 ],
+    "INT_col" :                  [ 1, 2, 3, 4, 5, -1, -2, -3, -4, -5, 2147483647, -2147483648 ],
+    "UINT4_col" :                [ 1, 2, 3, 4, 5, 2147483700 ],
+    "FLOAT4_col" :               [ 1.0, 2.0, 3.0, 4.0, 5.0, 1000000000000.0, -1000000000000.0 ],
+    "TIME_col" :                 [ "2:30, "11:45", "12:00", 11:59", 23:59" ],
+    "DECIMAL9_col" :             [ "1.0", "2.0", "3.0", "4.0", "5.0", "100.100", "0.0000001" ],
+    "BIGINT_col" :               [ 1, 2, 3, 4, 5, 9223372036854775000, -9223372036854775000],
+    "UINT8_col" :                [ "1", "2", "3", "4", "5", "9223372036854778000" ],
+    "FLOAT8_col" :               [ 1.0, 2.0, 3.0, 4.0, 5.0, 10000000000000.0, -10000000000000.0 ],
+    "DATE_col":                  [ "1995-01-01", "1995-01-02", "1995-01-03", "1995-01-04", "1995-01-05" ],
+    "TIMESTAMP_col" :            [ "1995-01-01 01:00:00.000","1995-01-01 01:00:00.000", "1995-01-01 01:00:00.000", "1995-01-01 01:00:00.000" ],
+    "DECIMAL18_col" :            ["123456789.000000000", "11.123456789", "0.100000000", "-0.100400000", "-987654321.123456789", "-2.030100000"],
+    "INTERVALYEAR" :
+    "INTERVALDAY" :
+    "INTERVAL" :
+    "DECIMAL28DENSE_col",
+    "DECIMAL38DENSE_col",
+    "DECIMAL38SPARSE_col",
+    "DECIMAL28SPARSE_col",
+    "VARBINARY_col",
+    "VARCHAR_col",
+    "VAR16CHAR_col",
+    "BIT_col",
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/test/resources/parquet/basic_repeated.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/basic_repeated.json b/exec/java-exec/src/test/resources/parquet/basic_repeated.json
new file mode 100644
index 0000000..ae39685
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/basic_repeated.json
@@ -0,0 +1,9 @@
+{
+    "int32": [1,2]
+}
+{
+    "int32": []
+}
+{
+    "int32": [1]
+}


Mime
View raw message