drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [drill] amansinha100 commented on a change in pull request #1738: DRILL-7062: Initial implementation of run-time row-group pruning
Date Mon, 08 Apr 2019 02:31:52 GMT
amansinha100 commented on a change in pull request #1738: DRILL-7062: Initial implementation
of run-time row-group pruning
URL: https://github.com/apache/drill/pull/1738#discussion_r272862031
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 ##########
 @@ -68,76 +83,131 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
     List<RecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent re-reading the footer
for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader (it is read
earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be populated upon the
first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
-        Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
-        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
-        if (!footers.containsKey(rowGroup.getPath())) {
-          if (timer != null) {
-            timer.start();
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row groups are pruned
out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      Path selectionRoot = rowGroupScan.getSelectionRoot();
+      // Runtime pruning: Avoid recomputing metadata objects for each row-group in case they
use the same file
+      // by keeping the following objects computed earlier (relies on same file being in
consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V3.ParquetTableMetadata_v3 tableMetadataV3 = null;
+      Metadata_V3.ParquetFileMetadata_v3 fileMetadataV3 = null;
+      FileSelection fileSelection = null;
+      ParquetTableMetadataProviderImpl metadataProvider = null;
+
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
+        /*
+        Here we could store a map from file names to footers, to prevent re-reading the footer
for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record reader (it is read
earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be populated upon the
first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
+
+          Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
+          DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
+          if (!footers.containsKey(rowGroup.getPath())) {
+            if (timer != null) {
+              timer.start();
+            }
+
+            ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath(), readerConfig);
+            if (timer != null) {
+              long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+              logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(),
"", 0, 0, 0, timeToRead);
+            }
+            footers.put(rowGroup.getPath(), footer);
           }
+          ParquetMetadata footer = footers.get(rowGroup.getPath());
+
+          //
+          //   If a filter is given (and it is not just "TRUE") - then use it to perform
run-time pruning
+          //
+          if ( filterExpr != null && ! (filterExpr instanceof ValueExpressions.BooleanExpression)
 ) { // skip when no filter or filter is TRUE
+
+            int rowGroupIndex = rowGroup.getRowGroupIndex();
+            long footerRowCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+            if ( timer != null ) {  // restart the timer, if tracing
+              timer.reset();
+              timer.start();
+            }
+
+            // When starting a new file, or at the first time - Initialize path specific
metadata etc
+            if ( ! rowGroup.getPath().equals(prevRowGroupPath) ) {
+              // Get the table metadata (V3)
+              tableMetadataV3 = Metadata.getParquetTableMetadata(footer, fs, rowGroup.getPath().toString(),
readerConfig);
+
+              // The file status for this file
+              FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+              List<FileStatus> listFileStatus = new ArrayList<>(Arrays.asList(fileStatus));
+              List<Path> listRowGroupPath = new ArrayList<>(Arrays.asList(rowGroup.getPath()));
+              List<ReadEntryWithPath> entries = new ArrayList<>(Arrays.asList(new
ReadEntryWithPath(rowGroup.getPath())));
+              fileSelection = new FileSelection(listFileStatus, listRowGroupPath, selectionRoot);
+
+              metadataProvider = new ParquetTableMetadataProviderImpl(entries, selectionRoot,
fileSelection.cacheFileRoot, readerConfig, fs,false);
+              // The file metadata (for all columns)
+              fileMetadataV3 = Metadata.getParquetFileMetadata_v3(tableMetadataV3, footer,
fileStatus, fs, true, null, readerConfig);
+
+              prevRowGroupPath = rowGroup.getPath(); // for next time
+            }
+
+            MetadataBase.RowGroupMetadata rowGroupMetadata = fileMetadataV3.getRowGroups().get(rowGroup.getRowGroupIndex());
+
+            Map<SchemaPath, ColumnStatistics> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV3,
rowGroupMetadata);
+
+            List<SchemaPath> columns = columnsStatistics.keySet().stream().collect(Collectors.toList());
+
+            ParquetGroupScan parquetGroupScan = new ParquetGroupScan( context.getQueryUserName(),
metadataProvider, fileSelection, columns, readerConfig, filterExpr);
+
+            FilterPredicate filterPredicate = parquetGroupScan.getFilterPredicate(filterExpr,
context, (FunctionImplementationRegistry) context.getFunctionRegistry(),
+              context.getOptions(), true);
 
-          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath(), readerConfig);
-          if (timer != null) {
-            long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
-            logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(),
"", 0, 0, 0, timeToRead);
+            //
+            // Perform the Run-Time Pruning - i.e. Skip this rowgroup if the match fails
+            //
+            RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics,
footerRowCount);
+            if (timer != null) { // if tracing
+              long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+              logger.trace("Run-time pruning: {} row-group {} (RG index: {} row count: {}),
took {} usec", match == RowsMatch.NONE ? "Excluded" : "Included", rowGroup.getPath(),
+                rowGroupIndex, footerRowCount, timeToRead);
+            }
+            if (match == RowsMatch.NONE) {
+              rowgroupsPruned++; // one more RG was pruned
+              if (firstRowGroup == null) {  // keep first RG, to be used in case all row
groups are pruned
+                firstRowGroup = rowGroup;
+                firstFooter = footer;
+              }
+              continue; // This Row group does not comply with the filter - prune it out
and check the next Row Group
+            }
           }
-          footers.put(rowGroup.getPath(), footer);
-        }
-        ParquetMetadata footer = footers.get(rowGroup.getPath());
-
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
-          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
-        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
-
-        boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
-        boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer,
rowGroupScan.getColumns());
-        logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", useNewReader
? "enabled" : "disabled",
-            containsComplexColumn ? "found." : "not found.");
-        RecordReader reader;
-
-        if (useNewReader || containsComplexColumn) {
-          reader = new DrillParquetReader(context,
-              footer,
-              rowGroup,
-              columnExplorer.getTableColumns(),
-              fs,
-              containsCorruptDates);
-        } else {
-          reader = new ParquetRecordReader(context,
-              rowGroup.getPath(),
-              rowGroup.getRowGroupIndex(),
-              rowGroup.getNumRecordsToRead(),
-              fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
0),
-              footer,
-              rowGroupScan.getColumns(),
-              containsCorruptDates);
-        }
 
-        logger.debug("Query {} uses {}",
-            QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
-            reader.getClass().getSimpleName());
-        readers.add(reader);
+          mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext,
columnExplorer, readers, implicitColumns, mapWithMaxColumns, rowGroup,
+           fs, footer, false);
+      }
 
-        List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
-        Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(),
partitionValues, rowGroupScan.supportsFileImplicitColumns());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
-        }
+      // in case all row groups were pruned out - create a single reader for the first one
(so that the schema could be returned)
+      if ( readers.size() == 0 && firstRowGroup != null ) {
+        logger.trace("All row groups were pruned out. Returning the first: {} (row count
{}) for its schema", firstRowGroup.getPath(), firstRowGroup.getNumRecordsToRead());
+        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(firstRowGroup), firstRowGroup.getPath());
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext,
columnExplorer, readers, implicitColumns, mapWithMaxColumns, firstRowGroup, fs,
+          firstFooter, true);
+      }
 
-      } catch (IOException e) {
-        throw new ExecutionSetupException(e);
+      // Update stats (same in every reader - the others would just overwrite the stats)
+      for (RecordReader rr : readers ) {
+        if ( rr instanceof ParquetRecordReader ) {
+          ((ParquetRecordReader) rr).updateRowgroupsStats(rowGroupScan.getRowGroupReadEntries().size(),
rowgroupsPruned);
+        }
       }
+
+    } catch (IOException|InterruptedException e) {
 
 Review comment:
   Which of the new calls generates an `InterruptedException` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message