From commits-return-4283-apmail-drill-commits-archive=drill.apache.org@drill.apache.org Sat Oct 22 00:33:07 2016 Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45AEA198EF for ; Sat, 22 Oct 2016 00:33:07 +0000 (UTC) Received: (qmail 65566 invoked by uid 500); 22 Oct 2016 00:33:07 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 65470 invoked by uid 500); 22 Oct 2016 00:33:07 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 65387 invoked by uid 99); 22 Oct 2016 00:33:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Oct 2016 00:33:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D17EDE93E5; Sat, 22 Oct 2016 00:33:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amansinha@apache.org To: commits@drill.apache.org Date: Sat, 22 Oct 2016 00:33:09 -0000 Message-Id: In-Reply-To: <12a832b7229442818d573de6d51c6dae@git.apache.org> References: <12a832b7229442818d573de6d51c6dae@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] drill git commit: DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read DRILL-4905: Push down the LIMIT to the parquet reader scan to limit the numbers of records read close apache/drill#597 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2c43535a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2c43535a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2c43535a Branch: refs/heads/master Commit: 2c43535adc59c5995e2ebe80df570fad9ba1aab3 Parents: 4efc9f2 Author: Padma Penumarthy Authored: Mon Oct 17 16:46:51 2016 -0700 Committer: Aman Sinha Committed: Fri Oct 21 16:01:03 2016 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetGroupScan.java | 69 +++++++++++++++----- .../store/parquet/ParquetScanBatchCreator.java | 2 +- .../exec/store/parquet/RowGroupReadEntry.java | 10 ++- .../columnreaders/ParquetRecordReader.java | 38 ++++++++++- .../store/parquet/ParquetRecordReaderTest.java | 65 +++++++++++++++++- 5 files changed, 163 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index b9f0ac0..a8e55b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -521,6 +521,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private int rowGroupIndex; private String root; private long rowCount; // rowCount = -1 indicates to include all rows. + private long numRecordsToRead; @JsonCreator public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start, @@ -528,10 +529,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan { super(path, start, length); this.rowGroupIndex = rowGroupIndex; this.rowCount = rowCount; + this.numRecordsToRead = rowCount; } public RowGroupReadEntry getRowGroupReadEntry() { - return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex); + return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), + this.rowGroupIndex, this.getNumRecordsToRead()); } public int getRowGroupIndex() { @@ -553,6 +556,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return byteMap; } + public long getNumRecordsToRead() { + return numRecordsToRead; + } + + public void setNumRecordsToRead(long numRecords) { + numRecordsToRead = numRecords; + } + public void setEndpointByteMap(EndpointByteMap byteMap) { this.byteMap = byteMap; } @@ -834,7 +845,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private List convertToReadEntries(List rowGroups) { List entries = Lists.newArrayList(); for (RowGroupInfo rgi : rowGroups) { - RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex()); + RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead()); entries.add(entry); } return entries; @@ -867,6 +878,10 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return toString(); } + public void setCacheFileRoot(String cacheFileRoot) { + this.cacheFileRoot = cacheFileRoot; + } + @Override public String toString() { String cacheFileString = ""; @@ -893,15 +908,44 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return newScan; } + // Based on maxRecords to read for the scan, + // figure out how many rowGroups to read and update number of records to read for each of them. + // Returns total number of rowGroups to read. + private int updateRowGroupInfo(long maxRecords) { + long count = 0; + int index = 0; + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + long rowCount = rowGroupInfo.getRowCount(); + if (count + rowCount <= maxRecords) { + count += rowCount; + rowGroupInfo.setNumRecordsToRead(rowCount); + index++; + continue; + } else if (count < maxRecords) { + rowGroupInfo.setNumRecordsToRead(maxRecords - count); + index++; + } + break; + } + + return index; + } + @Override - public FileGroupScan clone(FileSelection selection) throws IOException { + public ParquetGroupScan clone(FileSelection selection) throws IOException { ParquetGroupScan newScan = new ParquetGroupScan(this); newScan.modifyFileSelection(selection); - newScan.cacheFileRoot = selection.cacheFileRoot; + newScan.setCacheFileRoot(selection.cacheFileRoot); newScan.init(selection.getMetaContext()); return newScan; } + public ParquetGroupScan clone(FileSelection selection, long maxRecords) throws IOException { + ParquetGroupScan newScan = clone(selection); + newScan.updateRowGroupInfo(maxRecords); + return newScan; + } + @Override public boolean supportsLimitPushdown() { return true; @@ -913,22 +957,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan { maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. // further optimization : minimize # of files chosen, or the affinity of files chosen. - long count = 0; - int index = 0; - for (RowGroupInfo rowGroupInfo : rowGroupInfos) { - if (count < maxRecords) { - count += rowGroupInfo.getRowCount(); - index ++; - } else { - break; - } - } + + // Calculate number of rowGroups to read based on maxRecords and update + // number of records to read for each of those rowGroups. + int index = updateRowGroupInfo(maxRecords); Set fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) { fileNames.add(rowGroupInfo.getPath()); } + // If there is no change in fileSet, no need to create new groupScan. if (fileNames.size() == fileSet.size() ) { // There is no reduction of rowGroups. Return the original groupScan. logger.debug("applyLimit() does not apply!"); @@ -938,7 +977,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { try { FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false); logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size()); - return this.clone(newSelection); + return this.clone(newSelection, maxRecords); } catch (IOException e) { logger.warn("Could not apply rowcount based prune due to Exception : {}", e); return null; http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index bf13977..a98c660 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -111,7 +111,7 @@ public class ParquetScanBatchCreator implements BatchCreator> columnStatuses; private FileSystem fileSystem; private long batchSize; + private long numRecordsToRead; // number of records to read + Path hadoopPath; private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; @@ -117,19 +120,34 @@ public class ParquetRecordReader extends AbstractRecordReader { public ParquetRecordReader(FragmentContext fragmentContext, String path, int rowGroupIndex, + long numRecordsToRead, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List columns, ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { - this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer, - columns, dateCorruptionStatus); + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead, + path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + } + + public ParquetRecordReader(FragmentContext fragmentContext, + String path, + int rowGroupIndex, + FileSystem fs, + CodecFactory codecFactory, + ParquetMetadata footer, + List columns, + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) + throws ExecutionSetupException { + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(), + path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); } public ParquetRecordReader( FragmentContext fragmentContext, long batchSize, + long numRecordsToRead, String path, int rowGroupIndex, FileSystem fs, @@ -145,6 +163,13 @@ public class ParquetRecordReader extends AbstractRecordReader { this.footer = footer; this.dateCorruptionStatus = dateCorruptionStatus; this.fragmentContext = fragmentContext; + // Callers can pass -1 if they want to read all rows. + if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) { + this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount(); + } else { + assert (numRecordsToRead >= 0); + this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount()); + } setColumns(columns); } @@ -444,11 +469,16 @@ public class ParquetRecordReader extends AbstractRecordReader { return 0; } recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead); + + // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit). + recordsToRead = Math.min(recordsToRead, numRecordsToRead); + for (final ValueVector vv : nullFilledVectors ) { vv.getMutator().setValueCount( (int) recordsToRead); } mockRecordsRead += recordsToRead; totalRecordsRead += recordsToRead; + numRecordsToRead -= recordsToRead; return (int) recordsToRead; } @@ -459,6 +489,9 @@ public class ParquetRecordReader extends AbstractRecordReader { } + // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit) + recordsToRead = Math.min(recordsToRead, numRecordsToRead); + if (allFieldsFixedLength) { readAllFixedFields(recordsToRead); } else { // variable length columns @@ -476,6 +509,7 @@ public class ParquetRecordReader extends AbstractRecordReader { // logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass(); + numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass(); return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (Exception e) { handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() + http://git-wip-us.apache.org/repos/asf/drill/blob/2c43535a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 51fa45c..6f3a19a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -637,7 +637,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final FileSystem fs = new CachedSingleFileSystem(fileName); final BufferAllocator allocator = RootAllocatorFactory.newRoot(c); for(int i = 0; i < 25; i++) { - final ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs, + final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs, CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0), f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION); final TestOutputMutator mutator = new TestOutputMutator(allocator); @@ -691,4 +691,67 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final long D = System.nanoTime(); System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); } + + @Test + public void testLimit() throws Exception { + List results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 1"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 1, recordsInOutput), 1 == recordsInOutput); + } + + @Test + public void testLimitBeyondRowCount() throws Exception { + List results = testSqlWithResults("SELECT * FROM cp.`/parquet/tpch/nation/01.parquet` LIMIT 100"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 9, recordsInOutput), 9 == recordsInOutput); + } + + @Test + public void testLimitMultipleRowGroups() throws Exception { + HashMap fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields); + populateFieldInfoMap(props); + TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props); + + List results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 225"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 225, recordsInOutput), 225 == recordsInOutput); + } + + @Test + public void testLimitMultipleRowGroupsBeyondRowCount() throws Exception { + HashMap fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(3, 100, 1024 * 1024, fields); + populateFieldInfoMap(props); + TestFileGenerator.generateParquetFile("/tmp/testLimit.parquet", props); + + List results = testSqlWithResults("SELECT * FROM dfs.`/tmp/testLimit.parquet` LIMIT 500"); + + int recordsInOutput = 0; + for (QueryDataBatch batch : results) { + recordsInOutput += batch.getHeader().getDef().getRecordCount(); + batch.release(); + } + + assertTrue(String.format("Number of records in output is wrong: expected=%d, actual=%s", 300, recordsInOutput), 300 == recordsInOutput); + } + }