drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [04/38] git commit: DRILL-315: Parquet/JSON selective column read, parquet performance evaluation code from Jacques. Requested columns are specified in physical plans, logical scan operations will be modified by the optimizer as it analyzes the rest of t
Date Tue, 04 Mar 2014 08:07:31 GMT
DRILL-315: Parquet/JSON selective column read, parquet performance evaluation code from Jacques. Requested columns are specified in physical plans, logical scan operations will be modified by the optimizer as it analyzes the rest of the query graph to decide which columns are needed. This still needs to be added to the optimizer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2dfadc9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2dfadc9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2dfadc9d

Branch: refs/heads/master
Commit: 2dfadc9d9945c347fded28ec90af5d0842f91853
Parents: b3460af
Author: Jason Altekruse <altekrusejason@gmail.com>
Authored: Wed Feb 12 09:28:07 2014 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Mar 3 23:22:16 2014 -0800

----------------------------------------------------------------------
 .../drill/common/expression/SchemaPath.java     |   4 +
 exec/java-exec/pom.xml                          |   4 +-
 .../drill/exec/store/json/JSONGroupScan.java    |  19 ++-
 .../drill/exec/store/json/JSONRecordReader.java |  29 ++++-
 .../exec/store/json/JSONScanBatchCreator.java   |   3 +-
 .../exec/store/json/JSONStorageEngine.java      |   3 +-
 .../drill/exec/store/json/JSONSubScan.java      |  16 ++-
 .../drill/exec/store/parquet/BitReader.java     |  10 +-
 .../exec/store/parquet/ColumnDataReader.java    |  90 +++++++++++++
 .../drill/exec/store/parquet/ColumnReader.java  |  64 +++++-----
 .../store/parquet/FixedByteAlignedReader.java   |   6 +-
 .../exec/store/parquet/NullableBitReader.java   |   3 +-
 .../store/parquet/NullableColumnReader.java     |   3 +-
 .../parquet/NullableFixedByteAlignedReader.java |  28 +++--
 .../exec/store/parquet/PageReadStatus.java      |  80 ++++++------
 .../exec/store/parquet/ParquetGroupScan.java    |  23 +++-
 .../exec/store/parquet/ParquetRecordReader.java | 126 +++++++++----------
 .../exec/store/parquet/ParquetRowGroupScan.java |  27 +++-
 .../store/parquet/ParquetScanBatchCreator.java  |  13 +-
 .../store/parquet/ParquetStorageEngine.java     |   5 +-
 .../exec/store/parquet/VarLenBinaryReader.java  |  11 +-
 .../drill/exec/store/JSONRecordReaderTest.java  |  50 +++++++-
 .../exec/store/ParquetRecordReaderTest.java     |  60 ++++++---
 .../test/resources/parquet_nullable_varlen.json |  41 ++++++
 .../parquet_selective_column_read.json          |  28 +++++
 25 files changed, 536 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 7c2dce6..76838dc 100644
--- a/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -63,6 +63,10 @@ public class SchemaPath extends LogicalExpressionBase {
     this.originalPath = path.originalPath;
     this.rootSegment = path.rootSegment;
   }
+
+  public SchemaPath(String str){
+    this(str, ExpressionPosition.UNKNOWN);
+  }
   
   public SchemaPath(CharSequence str, ExpressionPosition pos) {
     super(pos);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index a016afc..5c6fcfe 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -77,7 +77,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.0.1</version>
+      <version>1.2.8</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -92,7 +92,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.0.1</version>
+      <version>1.2.8</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
index 8988f44..4782d82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntry;
@@ -51,15 +52,18 @@ public class JSONGroupScan extends AbstractGroupScan {
   private final OperatorCost cost;
   private final Size size;
   private final FieldReference ref;
+  private final List<SchemaPath> columns;
 
   @JsonCreator
   public JSONGroupScan(@JsonProperty("entries") List<ScanEntry> entries,
                        @JsonProperty("storageengine") JSONStorageEngineConfig storageEngineConfig,
-                       @JacksonInject StorageEngineRegistry engineRegistry, @JsonProperty("ref") FieldReference ref) throws ExecutionSetupException {
-    this(entries, (JSONStorageEngine) engineRegistry.getEngine(storageEngineConfig), ref);
+                       @JacksonInject StorageEngineRegistry engineRegistry, @JsonProperty("ref") FieldReference ref,
+                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    this(entries, (JSONStorageEngine) engineRegistry.getEngine(storageEngineConfig), ref, columns);
   }
 
-  public JSONGroupScan(List<ScanEntry> entries, JSONStorageEngine engine, FieldReference ref) {
+  public JSONGroupScan(List<ScanEntry> entries, JSONStorageEngine engine, FieldReference ref,
+                       List<SchemaPath> columns) {
     this.engine = engine;
     this.readEntries = entries;
     OperatorCost cost = new OperatorCost(0, 0, 0, 0);
@@ -71,6 +75,7 @@ public class JSONGroupScan extends AbstractGroupScan {
     this.cost = cost;
     this.size = size;
     this.ref = ref;
+    this.columns = columns;
   }
   
   @SuppressWarnings("unchecked")
@@ -97,7 +102,7 @@ public class JSONGroupScan extends AbstractGroupScan {
   @Override
   public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException{
     checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
-    return new JSONSubScan(mappings[minorFragmentId], engine, ref);
+    return new JSONSubScan(mappings[minorFragmentId], engine, ref, columns);
   }
 
   @Override
@@ -108,7 +113,7 @@ public class JSONGroupScan extends AbstractGroupScan {
   @Override
   @JsonIgnore
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new JSONGroupScan(readEntries, engine, ref);
+    return new JSONGroupScan(readEntries, engine, ref, columns);
   }
 
   public static class ScanEntry implements ReadEntry {
@@ -141,6 +146,10 @@ public class JSONGroupScan extends AbstractGroupScan {
     return readEntries.size();
   }
 
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
   @Override
   public OperatorCost getCost() {
     return cost;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
index 49aea22..11b972c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
@@ -90,18 +90,22 @@ public class JSONRecordReader implements RecordReader {
   private BufferAllocator allocator;
   private int batchSize;
   private final FieldReference ref;
+  private final List<SchemaPath> columns;
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize, FieldReference ref) {
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
+                          FieldReference ref, List<SchemaPath> columns) {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
     this.allocator = fragmentContext.getAllocator();
     this.batchSize = batchSize;
     valueVectorMap = Maps.newHashMap();
     this.ref = ref;
+    this.columns = columns;
   }
 
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, FieldReference ref) {
-    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, ref);
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, FieldReference ref,
+                          List<SchemaPath> columns) {
+    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, ref, columns);
   }
 
   private JsonParser getParser() {
@@ -209,6 +213,19 @@ public class JSONRecordReader implements RecordReader {
     return allocator;
   }
 
+  private boolean fieldSelected(String field){
+    SchemaPath sp = new SchemaPath(field, ExpressionPosition.UNKNOWN);
+    if (this.columns != null && this.columns.size() > 1){
+      for (SchemaPath expr : this.columns){
+        if ( sp.equals(expr)){
+          return true;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
   public static enum ReadType {
     ARRAY(END_ARRAY) {
       @Override
@@ -264,6 +281,12 @@ public class JSONRecordReader implements RecordReader {
         }
 
         String fieldName = parser.getCurrentName();
+        if ( fieldName != null && ! reader.fieldSelected(fieldName)){
+          // this field was not requested in the query
+          token = parser.nextToken();
+          colIndex += 1;
+          continue;
+        }
         MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
         ReadType readType = null;
         switch (token) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
index a4c00cc..c40cb47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
@@ -37,7 +37,8 @@ public class JSONScanBatchCreator implements BatchCreator<JSONSubScan> {
         List<JSONGroupScan.ScanEntry> entries = config.getReadEntries();
         List<RecordReader> readers = Lists.newArrayList();
         for (JSONGroupScan.ScanEntry e : entries) {
-            readers.add(new JSONRecordReader(context, e.getPath(), config.getStorageEngine().getFileSystem(), config.getRef()));
+            readers.add(new JSONRecordReader(context, e.getPath(), config.getStorageEngine().getFileSystem(), config.getRef(),
+                config.getColumns()));
         }
 
         return new ScanBatch(context, readers.iterator());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
index bd3bca5..e4f2070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.json;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStorageEngine;
@@ -65,7 +66,7 @@ public class JSONStorageEngine extends AbstractStorageEngine {
   @Override
   public JSONGroupScan getPhysicalScan(Scan scan) throws IOException {
     ArrayList<ScanEntry> readEntries = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<ArrayList<ScanEntry>>() {});
-    return new JSONGroupScan(readEntries, this, scan.getOutputReference());
+    return new JSONGroupScan(readEntries, this, scan.getOutputReference(), null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
index a04f906..92f6c0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractBase;
@@ -46,15 +47,19 @@ public class JSONSubScan extends AbstractBase implements SubScan {
   private final Size size;
   private final JSONStorageEngine storageEngine;
   private final FieldReference ref;
+  private final List<SchemaPath> columns;
 
   @JsonCreator
   public JSONSubScan(@JacksonInject StorageEngineRegistry registry,
                      @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
-                     @JsonProperty("readEntries") List<JSONGroupScan.ScanEntry> readEntries, @JsonProperty("ref") FieldReference ref) throws ExecutionSetupException {
-    this(readEntries, (JSONStorageEngine) registry.getEngine(engineConfig), ref);
+                     @JsonProperty("readEntries") List<JSONGroupScan.ScanEntry> readEntries,
+                     @JsonProperty("ref") FieldReference ref,
+                     @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    this(readEntries, (JSONStorageEngine) registry.getEngine(engineConfig), ref, columns);
   }
   
-  JSONSubScan(List<JSONGroupScan.ScanEntry> readEntries, JSONStorageEngine engine, FieldReference ref){
+  JSONSubScan(List<JSONGroupScan.ScanEntry> readEntries, JSONStorageEngine engine, FieldReference ref,
+              List<SchemaPath> columns){
     this.readEntries = readEntries;
     this.storageEngine = engine;
     OperatorCost cost = new OperatorCost(0, 0, 0, 0);
@@ -66,6 +71,7 @@ public class JSONSubScan extends AbstractBase implements SubScan {
     this.cost = cost;
     this.size = size;
     this.ref = ref;
+    this.columns = columns;
   }
   
   public FieldReference getRef() {
@@ -109,4 +115,8 @@ public class JSONSubScan extends AbstractBase implements SubScan {
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.emptyIterator();
   }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
index a3f9503..d0049c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
@@ -24,11 +25,12 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 public final class BitReader extends ColumnReader {
 
-  byte currentByte;
-  byte nextByte;
-
+  private byte currentByte;
+  private byte nextByte;
+  private byte[] bytes;
+  
   BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-            boolean fixedLength, ValueVector v) {
+            boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
new file mode 100644
index 0000000..ec44747
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.bytes.BytesInput;
+import parquet.format.PageHeader;
+import parquet.format.Util;
+
+public class ColumnDataReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
+  
+  private final long endPosition;
+  private final FSDataInputStream input;
+  
+  public ColumnDataReader(FileSystem fs, Path path, long start, long length) throws IOException{
+    this.input = fs.open(path, 64 * 1024);
+    this.input.seek(start);
+    this.endPosition = start + length;
+  }
+  
+  public PageHeader readPageHeader() throws IOException{
+    return Util.readPageHeader(input);
+  }
+  
+  public BytesInput getPageAsBytesInput(int pageLength) throws IOException{
+    byte[] b = new byte[pageLength];
+    input.read(b);
+    return new HadoopBytesInput(b);
+  }
+  
+  public void clear(){
+    try{
+      input.close();
+    }catch(IOException ex){
+      logger.warn("Error while closing input stream.", ex);
+    }
+  }
+
+  public boolean hasRemainder() throws IOException{
+    return input.getPos() < endPosition;
+  }
+  
+  public class HadoopBytesInput extends BytesInput{
+
+    private final byte[] pageBytes;
+    
+    public HadoopBytesInput(byte[] pageBytes) {
+      super();
+      this.pageBytes = pageBytes;
+    }
+
+    @Override
+    public byte[] toByteArray() throws IOException {
+      return pageBytes;
+    }
+
+    @Override
+    public long size() {
+      return pageBytes.length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      out.write(pageBytes);
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 0277d12..94ccbfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -18,9 +18,11 @@
 package org.apache.drill.exec.store.parquet;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.VectorHolder;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
 import parquet.column.ColumnDescriptor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.schema.PrimitiveType;
@@ -28,53 +30,57 @@ import parquet.schema.PrimitiveType;
 import java.io.IOException;
 
 public abstract class ColumnReader {
+  
+  final ParquetRecordReader parentReader;
+  
   // Value Vector for this column
-  VectorHolder valueVecHolder;
+  final VectorHolder valueVecHolder;
   // column description from the parquet library
-  ColumnDescriptor columnDescriptor;
+  final ColumnDescriptor columnDescriptor;
   // metadata of the column, from the parquet library
-  ColumnChunkMetaData columnChunkMetaData;
+  final ColumnChunkMetaData columnChunkMetaData;
   // status information on the current page
-  PageReadStatus pageReadStatus;
-
-  long readPositionInBuffer;
+  final PageReadStatus pageReadStatus;
 
   // quick reference to see if the field is fixed length (as this requires an instanceof)
-  boolean isFixedLength;
+  final boolean isFixedLength;
+
   // counter for the total number of values read from one or more pages
   // when a batch is filled all of these values should be the same for each column
   int totalValuesRead;
+  
   // counter for the values that have been read in this pass (a single call to the next() method)
   int valuesReadInCurrentPass;
+  
   // length of single data value in bits, if the length is fixed
   int dataTypeLengthInBits;
   int bytesReadInCurrentPass;
-  ParquetRecordReader parentReader;
 
-  ByteBuf vectorData;
+  protected ByteBuf vectorData;
 
   // variables for a single read pass
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
-  byte[] bytes;
 
-  ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, ValueVector v){
+  protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     this.parentReader = parentReader;
-    if (allocateSize > 1) valueVecHolder = new VectorHolder(allocateSize, v);
-    else valueVecHolder = new VectorHolder(5000, v);
-
-    columnDescriptor = descriptor;
+    this.columnDescriptor = descriptor;
     this.columnChunkMetaData = columnChunkMetaData;
-    isFixedLength = fixedLength;
+    this.isFixedLength = fixedLength;
+
+    if (allocateSize > 1) {
+      valueVecHolder = new VectorHolder(allocateSize, v);
+    } else {
+      valueVecHolder = new VectorHolder(5000, v);
+    }
 
-    pageReadStatus = new PageReadStatus(this, parentReader.getRowGroupIndex(), parentReader.getBufferWithAllData());
 
-    if (parentReader.getRowGroupIndex() != 0) readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset() - 4;
-    else readPositionInBuffer = columnChunkMetaData.getFirstDataPageOffset();
+    this.pageReadStatus = new PageReadStatus(this, parentReader.fileSystem, parentReader.hadoopPath, columnChunkMetaData);
 
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
     }
+
   }
 
   public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
@@ -82,17 +88,16 @@ public abstract class ColumnReader {
     readLength = 0;
     readLengthInBits = 0;
     recordsReadInThisIteration = 0;
-    vectorData = ((BaseValueVector)valueVecHolder.getValueVector()).getData();
+    vectorData = ((BaseValueVector) valueVecHolder.getValueVector()).getData();
     do {
       // if no page has been read, or all of the records have been read out of a page, read the next one
-      if (pageReadStatus.currentPage == null
-          || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
+      if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
         if (!pageReadStatus.next()) {
           break;
         }
       }
 
-      readField( recordsToReadInThisPass, firstColumnStatus);
+      readField(recordsToReadInThisPass, firstColumnStatus);
 
       valuesReadInCurrentPass += recordsReadInThisIteration;
       totalValuesRead += recordsReadInThisIteration;
@@ -102,10 +107,13 @@ public abstract class ColumnReader {
       } else {
         pageReadStatus.readPosInBytes = readStartInBytes + readLength;
       }
-    }
-    while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
-    valueVecHolder.getValueVector().getMutator().setValueCount(
-        valuesReadInCurrentPass);
+    } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
+    valueVecHolder.getValueVector().getMutator().setValueCount(valuesReadInCurrentPass);
+  }
+
+  public void clear() {
+    this.valueVecHolder.reset();
+    this.pageReadStatus.clear();
   }
 
   protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 7208cf7..3aae189 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
@@ -24,8 +25,11 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 public class FixedByteAlignedReader extends ColumnReader {
 
+  private byte[] bytes;
+
+  
   FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                         boolean fixedLength, ValueVector v) {
+                         boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
index 7da45cc..1dde7c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableBitVector;
@@ -37,7 +38,7 @@ import java.io.IOException;
 public final class NullableBitReader extends ColumnReader {
 
   NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v) {
+                    boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 596108c..5ac9bb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
 import org.apache.drill.exec.vector.ValueVector;
@@ -34,7 +35,7 @@ public abstract class NullableColumnReader extends ColumnReader{
   int bitsUsed;
 
   NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, ValueVector v){
+               boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index 094e955..bb81024 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -17,16 +17,18 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
+
 import parquet.column.ColumnDescriptor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 public class NullableFixedByteAlignedReader extends NullableColumnReader {
 
-  NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                         boolean fixedLength, ValueVector v) {
+  private byte[] bytes;
+
+  NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
   }
 
@@ -34,15 +36,15 @@ public class NullableFixedByteAlignedReader extends NullableColumnReader {
   @Override
   protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
 
-    recordsReadInThisIteration = recordsToReadInThisPass;
-
-    readStartInBytes = pageReadStatus.readPosInBytes;
-    readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
-    readLength = (int) Math.ceil(readLengthInBits / 8.0);
+    this.recordsReadInThisIteration = recordsToReadInThisPass;
 
-    bytes = pageReadStatus.pageDataByteArray;
-    // vectorData is assigned by the superclass read loop method
-    vectorData.writeBytes(bytes,
-        (int) readStartInBytes, (int) readLength);
+    // set up metadata
+    this.readStartInBytes = pageReadStatus.readPosInBytes;
+    this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+    this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+    this.bytes = pageReadStatus.pageDataByteArray;
+    
+    // fill in data.
+    vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 39d636d..c08dcf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -17,29 +17,28 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import parquet.bytes.BytesInput;
 import parquet.column.ValuesType;
 import parquet.column.page.Page;
-import parquet.column.page.PageReader;
 import parquet.column.values.ValuesReader;
 import parquet.format.PageHeader;
-
-import java.io.IOException;
-
-import static parquet.format.Util.readPageHeader;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 // class to keep track of the read position of variable length columns
 public final class PageReadStatus {
 
-  ColumnReader parentColumnReader;
-
+  private final ColumnReader parentColumnReader;
+  private final ColumnDataReader dataReader;
   // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
   Page currentPage;
-  // buffer to store bytes of current page, set to max size of parquet page
-  byte[] pageDataByteArray = new byte[ParquetRecordReader.PARQUET_PAGE_MAX_SIZE];
-
+  // buffer to store bytes of current page
+  byte[] pageDataByteArray;
   // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
   long readPosInBytes;
   // bit shift needed for the next page if the last one did not line up with a byte boundary
@@ -50,15 +49,25 @@ public final class PageReadStatus {
   // the number of values read out of the last page
   int valuesRead;
   int byteLength;
-  int rowGroupIndex;
+  //int rowGroupIndex;
   ValuesReader definitionLevels;
   ValuesReader valueReader;
 
-  PageReadStatus(ColumnReader parentStatus, int rowGroupIndex, ByteBuf bufferWithAllData){
+  PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
-    this.rowGroupIndex = rowGroupIndex;
+
+    long totalByteLength = columnChunkMetaData.getTotalSize();
+    long start = columnChunkMetaData.getFirstDataPageOffset();
+
+    try{
+      this.dataReader = new ColumnDataReader(fs, path, start, totalByteLength);
+    } catch (IOException e) {
+      throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e);
+    }
+    
   }
 
+  
   /**
    * Grab the next page.
    *
@@ -67,27 +76,15 @@ public final class PageReadStatus {
    */
   public boolean next() throws IOException {
 
-    int shift = 0;
-    if (rowGroupIndex == 0) shift = 0;
-    else shift = 4;
-    // first ROW GROUP has a different endpoint, because there are for bytes at the beginning of the file "PAR1"
-    if (parentColumnReader.readPositionInBuffer + shift == parentColumnReader.columnChunkMetaData.getFirstDataPageOffset() + parentColumnReader.columnChunkMetaData.getTotalSize()){
-      return false;
-    }
-    // TODO - in the JIRA for parquet steven put a stack trace for an error with a row group with 3 values in it
-    // the Math.min with the end of the buffer should fix it but now I'm not getting results back, leaving it here for now
-    // because it is needed, but there might be a problem with it
-    ByteBufInputStream f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
-        (int) parentColumnReader.readPositionInBuffer,
-        Math.min(200, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer)));
-    int before = f.available();
-    PageHeader pageHeader = readPageHeader(f);
-    int length = before - f.available();
-    f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
-        (int) parentColumnReader.readPositionInBuffer + length, pageHeader.getCompressed_page_size()));
+    if(!dataReader.hasRemainder()) return false;
+
+    // next, we need to decompress the bytes
+    PageHeader pageHeader = dataReader.readPageHeader();
 
     BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
-        .decompress(BytesInput.from(f, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size(),
+        .decompress( //
+            dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // 
+            pageHeader.getUncompressed_page_size(), //
             parentColumnReader.columnChunkMetaData.getCodec());
     currentPage = new Page(
         bytesIn,
@@ -98,7 +95,6 @@ public final class PageReadStatus {
         ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
     );
 
-    parentColumnReader.readPositionInBuffer += pageHeader.compressed_page_size + length;
     byteLength = pageHeader.uncompressed_page_size;
 
     if (currentPage == null) {
@@ -106,12 +102,14 @@ public final class PageReadStatus {
     }
 
     // if the buffer holding each page's data is not large enough to hold the current page, re-allocate, with a little extra space
-    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
-      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
-    }
+//    if (pageHeader.getUncompressed_page_size() > pageDataByteArray.length) {
+//      pageDataByteArray = new byte[pageHeader.getUncompressed_page_size() + 100];
+//    }
     // TODO - would like to get this into the mainline, hopefully before alpha
     pageDataByteArray = currentPage.getBytes().toByteArray();
 
+    readPosInBytes = 0;
+    valuesRead = 0;
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
       definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
       valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
@@ -120,8 +118,10 @@ public final class PageReadStatus {
       readPosInBytes = endOfDefinitionLevels;
     }
 
-    readPosInBytes = 0;
-    valuesRead = 0;
     return true;
   }
+  
+  public void clear(){
+    this.dataReader.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index cb05273..aa01115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -35,6 +35,9 @@ import com.codahale.metrics.Timer;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
@@ -81,6 +84,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
   private List<RowGroupInfo> rowGroupInfos;
   private Stopwatch watch = new Stopwatch();
 
+  private List<SchemaPath> columns;
+
   public List<ReadEntryWithPath> getEntries() {
     return entries;
   }
@@ -103,9 +108,11 @@ public class ParquetGroupScan extends AbstractGroupScan {
   public ParquetGroupScan(@JsonProperty("entries") List<ReadEntryWithPath> entries,
                           @JsonProperty("storageengine") ParquetStorageEngineConfig storageEngineConfig,
                           @JacksonInject StorageEngineRegistry engineRegistry,
-                          @JsonProperty("ref") FieldReference ref
+                          @JsonProperty("ref") FieldReference ref,
+                          @JsonProperty("columns") List<SchemaPath> columns
                            )throws IOException, ExecutionSetupException {
     engineRegistry.init(DrillConfig.create());
+    this.columns = columns;
     this.storageEngine = (ParquetStorageEngine) engineRegistry.getEngine(storageEngineConfig);
     this.availableEndpoints = storageEngine.getContext().getBits();
     this.fs = storageEngine.getFileSystem();
@@ -116,9 +123,12 @@ public class ParquetGroupScan extends AbstractGroupScan {
     calculateEndpointBytes();
   }
 
-  public ParquetGroupScan(List<ReadEntryWithPath> entries,
-                          ParquetStorageEngine storageEngine, FieldReference ref) throws IOException {
+  public ParquetGroupScan(List<ReadEntryWithPath> entries, //
+                          ParquetStorageEngine storageEngine, // 
+                          FieldReference ref, //
+                          List<SchemaPath> columns) throws IOException {
     this.storageEngine = storageEngine;
+    this.columns = columns;
     this.engineConfig = storageEngine.getEngineConfig();
     this.availableEndpoints = storageEngine.getContext().getBits();
     this.fs = storageEngine.getFileSystem();
@@ -362,7 +372,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
       logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
     }
     Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
-    return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId), ref);
+    return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId), ref,
+            columns);
   }
 
   
@@ -375,6 +386,10 @@ public class ParquetGroupScan extends AbstractGroupScan {
     return rowGroupInfos.size();
   }
 
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
   @Override
   public OperatorCost getCost() {
     //TODO Figure out how to properly calculate cost

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 9a6b7ff..1e6c31a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.parquet;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,7 +38,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -58,7 +56,8 @@ public class ParquetRecordReader implements RecordReader {
   private static final int NUMBER_OF_VECTORS = 1;
   private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
-
+  private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+  
   // TODO - should probably find a smarter way to set this, currently 1 megabyte
   private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
   public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
@@ -73,18 +72,18 @@ public class ParquetRecordReader implements RecordReader {
   private int bitWidthAllFixedFields;
   private boolean allFieldsFixedLength;
   private int recordsPerBatch;
-  private ByteBuf bufferWithAllData;
   private final FieldReference ref;
   private long totalRecords;
   private long rowGroupOffset;
 
   private List<ColumnReader> columnStatuses;
-  private FileSystem fileSystem;
-  private BufferAllocator allocator;
+  FileSystem fileSystem;
+  BufferAllocator allocator;
   private long batchSize;
-  private Path hadoopPath;
+  Path hadoopPath;
   private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
+  private List<SchemaPath> columns;
 
   public CodecFactoryExposer getCodecFactoryExposer() {
     return codecFactoryExposer;
@@ -96,16 +95,17 @@ public class ParquetRecordReader implements RecordReader {
 
   public ParquetRecordReader(FragmentContext fragmentContext,
                              String path, int rowGroupIndex, FileSystem fs,
-                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, FieldReference ref) throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer, ref);
+                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, FieldReference ref,
+                             List<SchemaPath> columns) throws ExecutionSetupException {
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer, ref,
+        columns);
   }
 
-
   public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
                              String path, int rowGroupIndex, FileSystem fs,
-                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, FieldReference ref) throws ExecutionSetupException {
+                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, FieldReference ref,
+                             List<SchemaPath> columns) throws ExecutionSetupException {
     this.allocator = fragmentContext.getAllocator();
-
     hadoopPath = new Path(path);
     fileSystem = fs;
     this.ref = ref;
@@ -113,10 +113,7 @@ public class ParquetRecordReader implements RecordReader {
     this.rowGroupIndex = rowGroupIndex;
     this.batchSize = batchSize;
     this.footer = footer;
-  }
-
-  public ByteBuf getBufferWithAllData() {
-    return bufferWithAllData;
+    this.columns = columns;
   }
 
   public int getRowGroupIndex() {
@@ -149,22 +146,42 @@ public class ParquetRecordReader implements RecordReader {
     }
   }
 
+  private boolean fieldSelected(MaterializedField field){
+    // TODO - not sure if this is how we want to represent this
+    // for now it makes the existing tests pass, simply selecting
+    // all available data if no columns are provided
+    if (this.columns != null){
+      for (SchemaPath expr : this.columns){
+        if ( field.matches(expr)){
+          return true;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     
-
     columnStatuses = new ArrayList<>();
-
     totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
-
     List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
     allFieldsFixedLength = true;
     ColumnDescriptor column;
     ColumnChunkMetaData columnChunkMetaData;
+    int columnsToScan = 0;
 
+    MaterializedField field;
     // loop to add up the length of the fixed width columns and build the schema
     for (int i = 0; i < columns.size(); ++i) {
       column = columns.get(i);
+      field = MaterializedField.create(toFieldName(column.getPath()),
+          toMajorType(column.getType(), getDataMode(column)));
+      if ( ! fieldSelected(field)){
+        continue;
+      }
+      columnsToScan++;
       // sum the lengths of all of the fixed length fields
       if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
         // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
@@ -178,10 +195,13 @@ public class ParquetRecordReader implements RecordReader {
       } else {
         allFieldsFixedLength = false;
       }
-
     }
     rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
 
+    // none of the columns in the parquet file matched the request columns from the query
+    if (columnsToScan == 0){
+      return;
+    }
     if (allFieldsFixedLength) {
       recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount());
     }
@@ -190,12 +210,14 @@ public class ParquetRecordReader implements RecordReader {
       ArrayList<VarLenBinaryReader.NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
       boolean fieldFixedLength = false;
-      MaterializedField field;
       for (int i = 0; i < columns.size(); ++i) {
         column = columns.get(i);
         columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
         field = MaterializedField.create(toFieldName(column.getPath()),
             toMajorType(column.getType(), getDataMode(column)));
+        // the field was not requested to be read
+        if ( ! fieldSelected(field)) continue;
+
         fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
         ValueVector v = TypeHelper.getNewVector(field, allocator);
         if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
@@ -213,10 +235,8 @@ public class ParquetRecordReader implements RecordReader {
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }
-    
-    
-    output.removeAllFields();
 
+    output.removeAllFields();
     try {
       for (ColumnReader crs : columnStatuses) {
         output.addField(crs.valueVecHolder.getValueVector());
@@ -232,48 +252,6 @@ public class ParquetRecordReader implements RecordReader {
       throw new ExecutionSetupException("Error setting up output mutator.", e);
     }
 
-    // the method for reading into a ByteBuf from a stream copies all of the data into a giant buffer
-    // here we do the same thing in a loop to not initialize so much on heap
-
-    // TODO - this should be replaced by an enhancement in Hadoop 2.0 that will allow reading
-    // directly into a ByteBuf passed into the reading method
-    int totalByteLength = 0;
-    long start = 0;
-    if (rowGroupIndex == 0){
-      totalByteLength = 4;
-    }
-    else{
-      start = rowGroupOffset;
-    }
-    // TODO - the methods for get total size and get total uncompressed size seem to have the opposite results of
-    // what they should
-    // I found the bug in the mainline and made a issue for it, hopefully it will be fixed soon
-    for (ColumnReader crs : columnStatuses){
-      totalByteLength += crs.columnChunkMetaData.getTotalSize();
-    }
-    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
-      totalByteLength += r.columnChunkMetaData.getTotalSize();
-    }
-    for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      totalByteLength += r.columnChunkMetaData.getTotalSize();
-    }
-    int bufferSize = 64*1024;
-    long totalBytesWritten = 0;
-    int validBytesInCurrentBuffer;
-    byte[] buffer = new byte[bufferSize];
-    
-    try (FSDataInputStream inputStream = fileSystem.open(hadoopPath)) {
-      bufferWithAllData = allocator.buffer(totalByteLength);
-      inputStream.seek(start);
-      while (totalBytesWritten < totalByteLength){
-        validBytesInCurrentBuffer = (int) Math.min(bufferSize, totalByteLength - totalBytesWritten);
-        inputStream.read(buffer, 0 , validBytesInCurrentBuffer);
-        bufferWithAllData.writeBytes(buffer, 0 , (int) validBytesInCurrentBuffer);
-        totalBytesWritten += validBytesInCurrentBuffer;
-      }
-    } catch (IOException e) {
-      throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + hadoopPath.getName());
-    }
   }
 
   private SchemaPath toFieldName(String[] paths) {
@@ -317,7 +295,7 @@ public class ParquetRecordReader implements RecordReader {
    */
   private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
                                           ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v)
-      throws SchemaChangeException {
+      throws SchemaChangeException, ExecutionSetupException {
     // if the column is required
     if (descriptor.getMaxDefinitionLevel() == 0){
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
@@ -371,8 +349,7 @@ public class ParquetRecordReader implements RecordReader {
       if (allFieldsFixedLength) {
         recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
       } else {
-        // arbitrary
-        recordsToRead = 4000;
+        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
 
         // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
         // get too complicated
@@ -504,8 +481,17 @@ public class ParquetRecordReader implements RecordReader {
 
   @Override
   public void cleanup() {
+    for (ColumnReader column : columnStatuses) {
+      column.clear();
+    }
     columnStatuses.clear();
-    bufferWithAllData.release();
+
+    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+      r.clear();
+    }
+    for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){
+      r.clear();
+    }
     varLengthReader.columns.clear();
     varLengthReader.nullableColumns.clear();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 9a94ec2..b3ce9b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -23,8 +23,9 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntryFromHDFS;
 import org.apache.drill.exec.physical.base.AbstractBase;
@@ -51,22 +52,31 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   private final ParquetStorageEngine parquetStorageEngine;
   private final List<RowGroupReadEntry> rowGroupReadEntries;
   private final FieldReference ref;
+  private final List<SchemaPath> columns;
 
   @JsonCreator
-  public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry, @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
-                             @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, @JsonProperty("ref") FieldReference ref) throws ExecutionSetupException {
+  public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry,
+                             @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
+                             @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
+                             @JsonProperty("ref") FieldReference ref,
+                             @JsonProperty("columns") List<SchemaPath> columns
+                             ) throws ExecutionSetupException {
     parquetStorageEngine = (ParquetStorageEngine) registry.getEngine(engineConfig);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.engineConfig = engineConfig;
     this.ref = ref;
+    this.columns = columns;
   }
 
   public ParquetRowGroupScan(ParquetStorageEngine engine, ParquetStorageEngineConfig config,
-                              List<RowGroupReadEntry> rowGroupReadEntries, FieldReference ref) {
+                              List<RowGroupReadEntry> rowGroupReadEntries, FieldReference ref,
+                              List<SchemaPath> columns
+                              ) {
     parquetStorageEngine = engine;
     engineConfig = config;
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.ref = ref;
+    this.columns = columns;
   }
 
   public List<RowGroupReadEntry> getRowGroupReadEntries() {
@@ -108,9 +118,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries, ref);
+    return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries,
+            ref, columns);
   }
 
   @Override
@@ -118,6 +129,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return Iterators.emptyIterator();
   }
 
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
   public static class RowGroupReadEntry extends ReadEntryFromHDFS {
 
     private int rowGroupIndex;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index f73ee84..966a16b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -20,7 +20,9 @@ package org.apache.drill.exec.store.parquet;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
@@ -48,6 +50,8 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
+    // keep footers in a map to avoid re-reading them
+    Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
     for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
       /*
       Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
@@ -57,12 +61,17 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       These fields will be added to the constructor below
       */
       try {
+        if ( ! footers.containsKey(e.getPath())){
+          footers.put(e.getPath(),
+              ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath())));
+        }
         readers.add(
             new ParquetRecordReader(
                 context, e.getPath(), e.getRowGroupIndex(), rowGroupScan.getStorageEngine().getFileSystem(),
                 rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
-                ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath())),
-                rowGroupScan.getRef()
+                footers.get(e.getPath()),
+                rowGroupScan.getRef(),
+                rowGroupScan.getColumns()
             )
         );
       } catch (IOException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
index 45b9cc1..ad9756e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -58,7 +59,6 @@ public class ParquetStorageEngine extends AbstractStorageEngine{
   private final DrillbitContext context;
   static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
   private CodecFactoryExposer codecFactoryExposer;
-  final ParquetMetadata footer;
   private final ParquetSchemaProvider schemaProvider;
   private final ParquetStorageEngineConfig engineConfig;
 
@@ -67,7 +67,6 @@ public class ParquetStorageEngine extends AbstractStorageEngine{
     this.schemaProvider = new ParquetSchemaProvider(configuration, context.getConfig());
     codecFactoryExposer = new CodecFactoryExposer(schemaProvider.conf);
     this.engineConfig = configuration;
-    this.footer = null;
   }
 
   public Configuration getHadoopConfig() {
@@ -99,7 +98,7 @@ public class ParquetStorageEngine extends AbstractStorageEngine{
     ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
         new TypeReference<ArrayList<ReadEntryWithPath>>() {});
 
-    return new ParquetGroupScan( Lists.newArrayList(readEntries), this, scan.getOutputReference());
+    return new ParquetGroupScan( readEntries, this, scan.getOutputReference(), null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 276a4df..0321838 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
@@ -43,7 +44,7 @@ public class VarLenBinaryReader {
 
   public static class VarLengthColumn extends ColumnReader {
 
-    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) {
+    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
     }
 
@@ -58,7 +59,7 @@ public class VarLenBinaryReader {
     int nullsRead;
     boolean currentValNull = false;
 
-    NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) {
+    NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
     }
 
@@ -118,6 +119,12 @@ public class VarLenBinaryReader {
         lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
       }
       for (NullableVarLengthColumn columnReader : nullableColumns) {
+        // check to make sure there is capacity for the next value (for nullables this is a check to see if there is
+        // still space in the nullability recording vector)
+        if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+          rowGroupFinished = true;
+          break;
+        }
         if (columnReader.pageReadStatus.currentPage == null
             || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
           columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 2be3e8d..62b9dfa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -33,6 +33,9 @@ import mockit.Expectations;
 import mockit.Injectable;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -136,7 +139,7 @@ public class JSONRecordReaderTest {
     };
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_1.json").toURI().toString(),
-        FileSystem.getLocal(new Configuration()), null);
+        FileSystem.getLocal(new Configuration()), null, null);
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -166,7 +169,7 @@ public class JSONRecordReaderTest {
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
-        FileSystem.getLocal(new Configuration()), null);
+        FileSystem.getLocal(new Configuration()), null, null);
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -196,6 +199,41 @@ public class JSONRecordReaderTest {
   }
 
   @Test
+  public void testChangedSchemaInTwoBatchesColumnSelect(@Injectable final FragmentContext context) throws IOException,
+      ExecutionSetupException {
+    new Expectations() {
+      {
+        context.getAllocator();
+        returns(new DirectBufferAllocator());
+      }
+    };
+
+    JSONRecordReader jr = new JSONRecordReader(context,
+        FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
+        FileSystem.getLocal(new Configuration()),
+        64, null, Arrays.asList(new SchemaPath("test", ExpressionPosition.UNKNOWN))); // batch only fits 1 int
+    MockOutputMutator mutator = new MockOutputMutator();
+    List<ValueVector> addFields = mutator.getAddFields();
+    List<MaterializedField> removedFields = mutator.getRemovedFields();
+
+    jr.setup(mutator);
+    assertEquals(1, jr.next());
+    assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
+    assertTrue(removedFields.isEmpty());
+    assertEquals(addFields.size(), 1);
+    assertEquals(1, jr.next());
+    assertField(addFields.get(0), 0, MinorType.INT, 1234, "test");
+    assertEquals(addFields.size(), 1);
+    assertTrue(removedFields.isEmpty());
+    removedFields.clear();
+    assertEquals(1, jr.next());
+    assertField(addFields.get(0), 0, MinorType.INT, 12345, "test");
+    assertEquals(addFields.size(), 1);
+    assertTrue(removedFields.isEmpty());
+    assertEquals(0, jr.next());
+  }
+
+  @Test
   public void testChangedSchemaInTwoBatches(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
     new Expectations() {
@@ -208,7 +246,7 @@ public class JSONRecordReaderTest {
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()),
-        64, null); // batch only fits 1 int
+        64, null, null); // batch only fits 1 int
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     List<MaterializedField> removedFields = mutator.getRemovedFields();
@@ -266,7 +304,7 @@ public class JSONRecordReaderTest {
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_3.json").toURI().toString(),
-        FileSystem.getLocal(new Configuration()), null);
+        FileSystem.getLocal(new Configuration()), null, null);
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -295,7 +333,7 @@ public class JSONRecordReaderTest {
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_4.json").toURI().toString(),
-        FileSystem.getLocal(new Configuration()), null);
+        FileSystem.getLocal(new Configuration()), null, null);
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
@@ -328,7 +366,7 @@ public class JSONRecordReaderTest {
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_5.json").toURI().toString(),
-        FileSystem.getLocal(new Configuration()), null);
+        FileSystem.getLocal(new Configuration()), null, null);
 
     MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
index 8efc762..6f568fb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
@@ -151,6 +151,32 @@ public class ParquetRecordReaderTest {
     testParquetFullEngine(false, "/parquet_nullable_varlen.json", "/tmp/nullable.parquet", 1, props);
   }
 
+  /**
+   * Tests the attribute in a scan node to limit the columns read by a scan.
+   *
+   * The functionality of selecting all columns is tested in all of the other tests that leave out the attribute.
+   * @throws Exception
+   */
+  @Test
+  public void testSelectColumnRead() throws Exception {
+    HashMap<String, FieldInfo> fields = new HashMap<>();
+    ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
+    // generate metatdata for a series of test columns, these columns are all generated in the test file
+    populateFieldInfoMap(props);
+    generateParquetFile("/tmp/test.parquet", props);
+    fields.clear();
+    // create a new object to describe the dataset expected out of the scan operation
+    // the fields added below match those requested in the plan specified in parquet_selective_column_read.json
+    // that is used below in the test query
+    props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
+    props.fields.put("integer", new FieldInfo("int32", "integer", 32, intVals, TypeProtos.MinorType.INT, props));
+    props.fields.put("bigInt", new FieldInfo("int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT, props));
+    props.fields.put("bin", new FieldInfo("binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY, props));
+    props.fields.put("bin2", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY, props));
+    testParquetFullEngineEventBased(false, "/parquet_selective_column_read.json", null, "/tmp/test.parquet", 1, props, false);
+  }
+
+
   @Test
   public void testMultipleRowGroupsAndReads() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -165,7 +191,7 @@ public class ParquetRecordReaderTest {
         readEntries += ",";
     }
     testParquetFullEngineEventBased(true, "/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "/tmp/test.parquet", i, props);
+        "/tmp/test.parquet", i, props, true);
   }
 
   // requires binary file generated by pig from TPCH data, also have to disable assert where data is coming in
@@ -185,18 +211,18 @@ public class ParquetRecordReaderTest {
         readEntries += ",";
     }
     testParquetFullEngineEventBased(false, "/parquet_scan_screen_read_entry_replace.json", readEntries,
-        "/tmp/test.parquet", i, props);
+        "/tmp/test.parquet", i, props, true);
   }
-
+/*
   @Test
   public void testMultipleRowGroupsEvent() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
-    ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields);
-    populateFieldInfoMap(props);
-    testParquetFullEngineEventBased(true, "/parquet_scan_screen.json", "/tmp/test.parquet", 1, props);
+    ParquetTestProperties props = new ParquetTestProperties(10, 30000, DEFAULT_BYTES_PER_PAGE, fields);
+    props.fields.put("a", new FieldInfo("a", "asdf", 1, new Object[3], TypeProtos.MinorType.BIGINT, props));
+    testParquetFullEngineEventBased(false, "/parquet_scan_screen.json", "/tmp/out", 1, props);
   }
 
-
+*/
   private class ParquetTestProperties{
     int numberRowGroups;
     int recordsPerRowGroup;
@@ -462,14 +488,17 @@ public class ParquetRecordReaderTest {
     }
   }
 
+
+
   // specific tests should call this method, but it is not marked as a test itself intentionally
   public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{
-    testParquetFullEngine(generateNew, plan, null, filename, numberOfTimesRead, props);
+    testParquetFullEngineEventBased(generateNew, plan, null, filename, numberOfTimesRead, props, true);
   }
 
   // specific tests should call this method, but it is not marked as a test itself intentionally
   public void testParquetFullEngineEventBased(boolean generateNew, String plan, String readEntries, String filename,
-                                              int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{
+                                              int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props,
+                                              boolean runAsLogicalPlan) throws Exception{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
 
     if (generateNew) generateParquetFile(filename, props);
@@ -482,12 +511,15 @@ public class ParquetRecordReaderTest {
       RecordBatchLoader batchLoader = new RecordBatchLoader(bit1.getContext().getAllocator());
       ParquetResultListener resultListener = new ParquetResultListener(batchLoader, props);
       long C = System.nanoTime();
-      if (readEntries != null){
-        client.runQuery(UserProtos.QueryType.LOGICAL, (Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries)), resultListener);
-      }
-      else{
-        client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
+      String planText = Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8);
+      // substitute in the string for the read entries, allows reuse of the plan file for several tests
+      if (readEntries != null) {
+        planText = planText.replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
       }
+      if (runAsLogicalPlan)
+        client.runQuery(UserProtos.QueryType.LOGICAL, planText, resultListener);
+      else
+        client.runQuery(UserProtos.QueryType.PHYSICAL, planText, resultListener);
       resultListener.getResults();
       for (String s : resultListener.valuesChecked.keySet()) {
         assertEquals("Record count incorrect for column: " + s,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/test/resources/parquet_nullable_varlen.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet_nullable_varlen.json b/exec/java-exec/src/test/resources/parquet_nullable_varlen.json
new file mode 100644
index 0000000..c21beec
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet_nullable_varlen.json
@@ -0,0 +1,41 @@
+{
+  head:{
+    type:"APACHE_DRILL_LOGICAL",
+    version:"1",
+    generator:{
+      type:"manual",
+      info:"na"
+    }
+  },
+  storage:{
+    "parquet" :
+      {
+        "type":"parquet",
+        "dfsName" : "file:///"
+      }
+  },
+  query:[
+    {
+      @id:"1",
+      op:"scan",
+      memo:"initial_scan",
+      storageengine:"parquet",
+      selection: [
+        {
+            path: "/tmp/nullable_varlen.parquet"
+        }
+      ]
+    },
+    {
+      @id:"2",
+      input: 1,
+      op: "store",
+      memo: "output sink",
+      target: {
+        file: "console:///stdout"
+      }
+
+    }
+
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dfadc9d/exec/java-exec/src/test/resources/parquet_selective_column_read.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet_selective_column_read.json b/exec/java-exec/src/test/resources/parquet_selective_column_read.json
new file mode 100644
index 0000000..61e582d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet_selective_column_read.json
@@ -0,0 +1,28 @@
+{
+  head : {
+    version : 1,
+    generator : {
+      type : "manual",
+      info : "na"
+    },
+    type : "APACHE_DRILL_PHYSICAL"
+  },
+  graph : [ {
+    pop : "parquet-scan",
+    @id : 1,
+    entries : [ {
+      path : "/tmp/test.parquet"
+    } ],
+    storageengine : {
+      type : "parquet",
+      dfsName : "file:///"
+    },
+    columns: [ "integer", "bigInt", "bin", "bin2"],
+    fragmentPointer : 0
+  }, {
+    pop : "screen",
+    @id : 2,
+    child : 1
+  } ]
+}
+


Mime
View raw message