drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [drill] 03/03: DRILL-6353: Upgrade Parquet MR dependencies
Date Thu, 14 Jun 2018 04:11:22 GMT
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ac8e69847659582e36c89fd52bb0856ab3bfbd21
Author: Vlad Rozov <vrozov@apache.org>
AuthorDate: Wed May 9 13:24:11 2018 -0700

    DRILL-6353: Upgrade Parquet MR dependencies
    
    closes #1259
---
 contrib/storage-hive/hive-exec-shade/pom.xml       | 25 +++++--
 exec/java-exec/pom.xml                             | 75 ---------------------
 .../exec/expr/stat/ParquetComparisonPredicate.java | 14 ++--
 .../drill/exec/expr/stat/ParquetIsPredicate.java   |  9 +--
 .../exec/expr/stat/ParquetPredicatesHelper.java    |  4 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 14 +++-
 .../drill/exec/store/parquet/ColumnDataReader.java |  9 +--
 .../drill/exec/store/parquet/FooterGatherer.java   |  4 +-
 .../parquet/ParquetDirectByteBufferAllocator.java  | 76 ++++++----------------
 .../exec/store/parquet/ParquetRecordWriter.java    | 13 +++-
 .../store/parquet/columnreaders/PageReader.java    | 51 +++++++++------
 .../columnreaders/VarLenBulkPageReader.java        | 17 ++---
 .../columnreaders/VarLenColumnBulkInput.java       |  4 +-
 .../exec/store/parquet/metadata/Metadata.java      | 16 ++++-
 .../filereader/BufferedDirectBufInputStream.java   |  8 +--
 .../exec/util/filereader/DirectBufInputStream.java | 11 +++-
 .../parquet/hadoop/ColumnChunkIncReadStore.java    | 39 ++++++-----
 .../hadoop/ParquetColumnChunkPageWriteStore.java   | 10 +--
 .../store/parquet/TestParquetMetadataCache.java    |  4 ++
 pom.xml                                            | 40 +++++++++++-
 20 files changed, 209 insertions(+), 234 deletions(-)

diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml
index 6f511ad..98fd4b8 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -31,6 +31,20 @@
   <packaging>jar</packaging>
   <name>contrib/hive-storage-plugin/hive-exec-shaded</name>
 
+  <properties>
+    <hive.parquet.version>1.8.3</hive.parquet.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-hadoop-bundle</artifactId>
+        <version>${hive.parquet.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -68,11 +82,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <!--Once newer hive-exec version leverages parquet-column 1.9.0, this dependency can be deleted -->
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-column</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
@@ -83,7 +92,7 @@
           <artifactSet>
             <includes>
               <include>org.apache.hive:hive-exec</include>
-              <include>org.apache.parquet:parquet-column</include>
+              <include>org.apache.parquet:parquet-hadoop-bundle</include>
               <include>commons-codec:commons-codec</include>
               <include>com.fasterxml.jackson.core:jackson-databind</include>
               <include>com.fasterxml.jackson.core:jackson-annotations</include>
@@ -118,6 +127,10 @@
               <shadedPattern>hive.org.apache.parquet.</shadedPattern>
             </relocation>
             <relocation>
+              <pattern>shaded.parquet.</pattern>
+              <shadedPattern>hive.shaded.parquet.</shadedPattern>
+            </relocation>
+            <relocation>
               <pattern>org.apache.avro.</pattern>
               <shadedPattern>hive.org.apache.avro.</shadedPattern>
             </relocation>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2205c2f..7701e76 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -249,92 +249,17 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>${parquet.version}</version>
       <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-format</artifactId>
-      <version>2.3.0-incubating</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
       <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-jackson</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-encoding</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-generator</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>javax.inject</groupId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 9e561ad..ebceefb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -113,7 +113,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
       // can drop when left's max < right's min, or right's max < left's min
       final C leftMin = leftStat.genericGetMin();
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
     }) {
       @Override
       public String toString() {
@@ -132,7 +132,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when left's max <= right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) <= 0;
+      return leftStat.compareMaxToValue(rightMin) <= 0;
     });
   }
 
@@ -146,7 +146,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when left's max < right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0;
+      return leftStat.compareMaxToValue(rightMin) < 0;
     });
   }
 
@@ -160,7 +160,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when right's max <= left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) <= 0;
+      return rightStat.compareMaxToValue(leftMin) <= 0;
     });
   }
 
@@ -173,7 +173,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when right's max < left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return rightStat.compareMaxToValue(leftMin) < 0;
     });
   }
 
@@ -188,8 +188,8 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
       // can drop when there is only one unique value.
       final C leftMax = leftStat.genericGetMax();
       final C rightMax = rightStat.genericGetMax();
-      return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
-          leftStat.genericGetMax().compareTo(rightMax) == 0;
+      return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
+          leftStat.compareMaxToValue(rightMax) == 0;
     });
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 9b04102..547dc06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.TypedFieldExpr;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
 
 import java.util.ArrayList;
@@ -114,7 +115,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -124,7 +125,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -134,7 +135,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
     );
   }
 
@@ -144,7 +145,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
     );
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index 7ff1036..f804a7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -43,7 +43,7 @@ class ParquetPredicatesHelper {
    *          False if at least one row is not null.
    */
   static boolean isAllNulls(Statistics stat, long rowCount) {
-    return stat.getNumNulls() == rowCount;
+    return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
   }
 
   /**
@@ -54,7 +54,7 @@ class ParquetPredicatesHelper {
    *          False if the parquet file hasn't nulls.
    */
   static boolean hasNoNulls(Statistics stat) {
-    return stat.getNumNulls() == 0;
+    return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 6a320b8..dc09ce1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -33,11 +33,12 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
@@ -50,6 +51,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
 public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ public abstract class AbstractParquetScanBatchCreator {
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
-    Configuration newConf = new Configuration(conf);
+    conf = new Configuration(conf);
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
     conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
-    return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
+    conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
+    ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+      return reader.getFooter();
+    }
   }
 
   private boolean isComplex(ParquetMetadata footer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index dcd40cf..79294da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -21,14 +21,13 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 public class ColumnDataReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public class ColumnDataReader {
 
   public void loadPage(DrillBuf target, int pageLength) throws IOException {
     target.clear();
-    ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
-    int lengthLeftToRead = pageLength;
-    while (lengthLeftToRead > 0) {
-      lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
-    }
+    HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
     target.writerIndex(pageLength);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index ea34c7d..d1562c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import com.google.common.base.Preconditions;
 
 import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 
 public class FooterGatherer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public class FooterGatherer {
         footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
       }
 
-      ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
+      final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
+      ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
       Footer footer = new Footer(status.getPath(), metadata);
       return footer;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 09f1b26..ba6aac9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -17,10 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -30,44 +31,42 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
 /**
  * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
  * {@link ByteBuffer} objects.<br>
- * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * To properly release an allocated {@link DrillBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
  * that was passed to the Parquet library.
  */
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
   private final BufferAllocator allocator;
-  private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
+  private final Map<ByteBuffer, DrillBuf> allocatedBuffers = new IdentityHashMap<>();
 
-  public ParquetDirectByteBufferAllocator(OperatorContext o){
-    allocator = o.getAllocator();
+  public ParquetDirectByteBufferAllocator(OperatorContext o) {
+    this(o.getAllocator());
   }
 
   public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
     this.allocator = allocator;
   }
 
-
   @Override
   public ByteBuffer allocate(int sz) {
-    ByteBuf bb = allocator.buffer(sz);
-    ByteBuffer b = bb.nioBuffer(0, sz);
-    final Key key = new Key(b);
-    allocatedBuffers.put(key, bb);
-    logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
-    return b;
+    DrillBuf drillBuf = allocator.buffer(sz);
+    ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz);
+    allocatedBuffers.put(byteBuffer, drillBuf);
+    logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and ByteBuffer {}", this, sz, drillBuf.getId(), System.identityHashCode(byteBuffer));
+    return byteBuffer;
   }
 
   @Override
-  public void release(ByteBuffer b) {
-    final Key key = new Key(b);
-    final ByteBuf bb = allocatedBuffers.get(key);
+  public void release(ByteBuffer byteBuffer) {
+    final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer);
     // The ByteBuffer passed in may already have been freed or not allocated by this allocator.
     // If it is not found in the allocated buffers, do nothing
-    if(bb != null) {
-      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
-      bb.release();
-      allocatedBuffers.remove(key);
+    if (drillBuf != null) {
+      logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, drillBuf.getId(), System.identityHashCode(byteBuffer));
+      drillBuf.release();
+    } else {
+      logger.warn("{}: ByteBuffer {} is not present", this, System.identityHashCode(byteBuffer));
     }
   }
 
@@ -75,41 +74,4 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   public boolean isDirect() {
     return true;
   }
-
-  /**
-   * ByteBuffer wrapper that computes a fixed hashcode.
-   * <br><br>
-   * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
-   * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br>
-   * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
-   * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
-   * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
-   */
-  private class Key {
-    final int hash;
-    final ByteBuffer buffer;
-
-    Key(final ByteBuffer buffer) {
-      this.buffer = buffer;
-      // remember, we can't use buffer.hashCode()
-      this.hash = System.identityHashCode(buffer);
-    }
-
-    @Override
-    public int hashCode() {
-      return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (!(obj instanceof Key)) {
-        return false;
-      }
-      final Key key = (Key) obj;
-      return hash == key.hash && buffer == key.buffer;
-    }
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 0e40c9e..0917926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -54,8 +54,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -241,8 +243,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     // once PARQUET-1006 will be resolved
     pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
         pageSize, new ParquetDirectByteBufferAllocator(oContext));
-    store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
-        writerVersion, new ParquetDirectByteBufferAllocator(oContext));
+    ParquetProperties parquetProperties = ParquetProperties.builder()
+        .withPageSize(pageSize)
+        .withDictionaryEncoding(enableDictionary)
+        .withDictionaryPageSize(initialPageBufferSize)
+        .withWriterVersion(writerVersion)
+        .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
+        .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+        .build();
+    store = new ColumnWriteStoreV1(pageStore, parquetProperties);
     MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index bf75695..01d0644 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBufUtil;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
@@ -250,7 +252,7 @@ class PageReader {
   }
 
   public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException {
-    return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
+    return BytesInput.from(buf.nioBuffer(offset, length));
   }
 
 
@@ -319,41 +321,44 @@ class PageReader {
 
     byteLength = pageHeader.uncompressed_page_size;
 
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
+    final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
       repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
-      repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      repetitionLevels.initFromPage(currentPageCount, in);
       // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
       // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
       // read the first zero here to simplify the reading processes, and start reading the first value the same as all
       // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
       // the first list of repetition levels
-      readPosInBytes = repetitionLevels.getNextOffset();
+      readPosInBytes = in.position();
       repetitionLevels.readInteger();
     }
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
       parentColumnReader.currDefLevel = -1;
       definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
-      definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
-      readPosInBytes = definitionLevels.getNextOffset();
+      definitionLevels.initFromPage(currentPageCount, in);
+      readPosInBytes = in.position();
       if (!valueEncoding.usesDictionary()) {
         valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-        valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+        valueReader.initFromPage(currentPageCount, in);
       }
     }
-    if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+    if (valueReader == null && parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
       valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-      valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      valueReader.initFromPage(currentPageCount, in);
     }
     if (valueEncoding.usesDictionary()) {
       // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
       // actually copying the values out into the vectors
+      Preconditions.checkState(readPosInBytes < pageData.capacity());
+      int index = (int)readPosInBytes;
+      ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - index);
       dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
-      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
       dictionaryValueReader = new DictionaryValuesReader(dictionary);
-      dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      dictionaryValueReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
       parentColumnReader.usingDictionary = true;
     } else {
       parentColumnReader.usingDictionary = false;
@@ -445,25 +450,29 @@ class PageReader {
    * @throws IOException An IO related condition
    */
   void resetDefinitionLevelReader(int skipCount) throws IOException {
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
-      throw new UnsupportedOperationException("Unsupoorted Operation");
-    }
+    Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1);
+    Preconditions.checkState(currentPageCount > 0);
 
+    final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
     final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
-    final int defStartPos = repetitionLevels != null ? repetitionLevels.getNextOffset() : 0;
+
+    final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
+
+    if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+      repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+      repetitionLevels.initFromPage(currentPageCount, in);
+      repetitionLevels.readInteger();
+    }
+
     definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
     parentColumnReader.currDefLevel = -1;
 
     // Now reinitialize the underlying decoder
-    assert currentPageCount > 0 : "Page count should be strictly upper than zero";
-    definitionLevels.initFromPage(currentPageCount, pageDataBuffer, defStartPos);
+    definitionLevels.initFromPage(currentPageCount, in);
 
     // Skip values if requested by caller
     for (int idx = 0; idx < skipCount; ++idx) {
       definitionLevels.skip();
     }
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index b6205c1..385cb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -66,12 +66,7 @@ final class VarLenBulkPageReader {
     this.buffer.order(ByteOrder.nativeOrder());
 
     if (pageInfoInput != null) {
-      this.pageInfo.pageData = pageInfoInput.pageData;
-      this.pageInfo.pageDataOff = pageInfoInput.pageDataOff;
-      this.pageInfo.pageDataLen = pageInfoInput.pageDataLen;
-      this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
-      this.pageInfo.definitionLevels = pageInfoInput.definitionLevels;
-      this.pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
+      set(pageInfoInput, false);
     }
 
     this.columnPrecInfo = columnPrecInfoInput;
@@ -87,15 +82,17 @@ final class VarLenBulkPageReader {
     nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
   }
 
-  final void set(PageDataInfo pageInfoInput) {
+  final void set(PageDataInfo pageInfoInput, boolean clear) {
     pageInfo.pageData = pageInfoInput.pageData;
     pageInfo.pageDataOff = pageInfoInput.pageDataOff;
     pageInfo.pageDataLen = pageInfoInput.pageDataLen;
     pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
     pageInfo.definitionLevels = pageInfoInput.definitionLevels;
     pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
-
-    buffer.clear();
+    pageInfo.numPageValues = pageInfoInput.numPageValues;
+    if (clear) {
+      buffer.clear();
+    }
   }
 
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
@@ -160,4 +157,4 @@ final class VarLenBulkPageReader {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 8daf2cc..1b30737 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -204,7 +204,7 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
         buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
 
       } else {
-        buffPagePayload.set(pageInfo);
+        buffPagePayload.set(pageInfo, true);
       }
     } else {
       if (buffPagePayload == null) {
@@ -567,4 +567,4 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   }
 
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index ab655e9..a61cc18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -39,17 +39,19 @@ import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -87,6 +89,7 @@ public class Metadata {
   public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
   public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
+  public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
 
   private final ParquetFormatConfig formatConfig;
 
@@ -409,9 +412,16 @@ public class Metadata {
       final FileStatus file, final FileSystem fs) throws IOException, InterruptedException {
     final ParquetMetadata metadata;
     final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+    final Configuration conf = new Configuration(fs.getConf());
+    final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
+        .useSignedStringMinMax(true)
+        .build();
     try {
-      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)
-          () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER));
+      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
+        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+          return parquetFileReader.getFooter();
+        }
+      });
     } catch(Exception e) {
       logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
         file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e..1d764b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     int nBytes = 0;
     if (bytesToRead > 0) {
       try {
-        nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+        nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
       } catch (Exception e) {
         logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
         throw new IOException((e));
@@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
           logger.trace(
               "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
                   + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
-              this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
-                  / 1000);
+              this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer,
+              ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a37..ea2542e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream {
     buf.clear();
     ByteBuffer directBuffer = buf.nioBuffer(0, len);
     int lengthLeftToRead = len;
+    SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
     while (lengthLeftToRead > 0) {
       if(logger.isTraceEnabled()) {
         logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
       }
       Stopwatch timer = Stopwatch.createStarted();
-      int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+      int bytesRead = seekableInputStream.read(directBuffer);
+      if (bytesRead < 0) {
+        return bytesRead;
+      }
       lengthLeftToRead -= bytesRead;
       if(logger.isTraceEnabled()) {
         logger.trace(
@@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream {
       b.release();
       throw e;
     }
-    if (bytesRead <= -1) {
+    if (bytesRead < 0) {
       b.release();
       return null;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 6e9db7e..89731ff 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -46,7 +46,7 @@ import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import io.netty.buffer.ByteBuf;
 
@@ -163,12 +163,10 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              int lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               return new DataPageV1(
-                      decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                      decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
                       fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
@@ -182,28 +180,33 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
               int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
               BytesInput decompressedPageData =
                   decompressor.decompress(
-                      BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+                      BytesInput.from(buffer),
                       pageHeader.uncompressed_page_size);
+              ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
+              int limit = byteBuffer.limit();
+              byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
+              BytesInput repetitionLevels = BytesInput.from(byteBuffer.slice());
+              byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length());
+              byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+              BytesInput definitionLevels = BytesInput.from(byteBuffer.slice());
+              byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+              byteBuffer.limit(limit);
+              BytesInput data = BytesInput.from(byteBuffer.slice());
+
               return new DataPageV2(
                       dataHeaderV2.getNum_rows(),
                       dataHeaderV2.getNum_nulls(),
                       dataHeaderV2.getNum_values(),
-                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length(),
-                          dataHeaderV2.getDefinition_levels_byte_length()),
+                      repetitionLevels,
+                      definitionLevels,
                       parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
-                          dataSize),
+                      data,
                       uncompressedPageSize,
                       fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
                       dataHeaderV2.isIs_compressed()
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 93f9920..0ed2245 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.parquet.hadoop;
 
-import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -119,7 +117,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
       this.path = path;
       this.compressor = compressor;
       this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
-      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+      this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
     }
 
     @Override
@@ -226,11 +224,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
         writer.writeDictionaryPage(dictionaryPage);
         // tracking the dictionary encoding is handled in writeDictionaryPage
       }
-      List<Encoding> encodings = Lists.newArrayList();
-      encodings.addAll(rlEncodings);
-      encodings.addAll(dlEncodings);
-      encodings.addAll(dataEncodings);
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
       writer.endColumn();
       logger.debug(
           String.format(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 50e679a..1da2530 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -30,6 +30,7 @@ import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -737,6 +738,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalDayPartitionPruning() throws Exception {
     final String intervalDayPartitionTable = "dfs.tmp.`interval_day_partition`";
@@ -762,6 +764,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalYearPartitionPruning() throws Exception {
     final String intervalYearPartitionTable = "dfs.tmp.`interval_yr_partition`";
@@ -812,6 +815,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
   @Test // DRILL-4139
   public void testDecimalPartitionPruning() throws Exception {
     List<String> ctasQueries = Lists.newArrayList();
diff --git a/pom.xml b/pom.xml
index 6078dc7..242b134 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.8.1-drill-r0</parquet.version>
+    <parquet.version>1.10.0</parquet.version>
     <calcite.version>1.16.0-drill-r3</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
@@ -1522,6 +1522,36 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-format</artifactId>
+        <version>2.5.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-common</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -2040,6 +2070,14 @@
           	<artifactId>parquet-hadoop</artifactId>
           	<version>${parquet.version}</version>
           	<exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-common</artifactId>
+              </exclusion>
           		<exclusion>
           			<groupId>org.xerial.snappy</groupId>
           			<artifactId>snappy-java</artifactId>

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.

Mime
View raw message