drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] KazydubB commented on a change in pull request #1640: DRILL-7038: Queries on partitioned columns scan the entire datasets
Date Fri, 01 Mar 2019 12:42:13 GMT
KazydubB commented on a change in pull request #1640: DRILL-7038: Queries on partitioned columns
scan the entire datasets
URL: https://github.com/apache/drill/pull/1640#discussion_r261588501
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
 ##########
 @@ -550,4 +567,211 @@ private static void setPruneStatus(MetadataContext metaContext, PruneStatus
prun
     }
   }
 
+  private static class PruneFilesOnScanRule extends PruneScanRule {
+
+    private final Pattern dirPattern;
+
+    private PruneFilesOnScanRule(OptimizerRulesContext optimizerRulesContext) {
+      super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)),
+          "PruneFilesOnScanRule:Prune_On_Scan", optimizerRulesContext);
+      String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+      dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+    }
+
+    @Override
+    public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan
scanRel) {
+      return new FileSystemPartitionDescriptor(settings, scanRel);
+    }
+
+    // Checks if query references directory columns only and has DISTINCT or GROUP BY operation
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      Aggregate aggregate = call.rel(0);
+      TableScan scan = call.rel(1);
+
+      if (!isQualifiedFilePruning(scan)
+          || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount())
{
+        return false;
+      }
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only
+      for (String field : fieldNames) {
+        if (!dirPattern.matcher(field).matches()) {
+          return false;
+        }
+      }
+
+      return scan.isDistinct() || aggregate.getGroupCount() > 0;
+    }
+
+    /*
+      Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected
files.
+      If cache metadata directory file exists, directory columns will be read from it,
+      otherwise directories will be gathered from selection (PartitionLocations).
+      DrillValuesRel will contain gathered constant literals.
+
+      For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has
directory structure year/quarter
+
+      00-00    Screen
+      00-01      Project(dir0=[$0], dir1=[$1])
+      00-02        HashAgg(group=[{0, 1}])
+      00-03          Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/path/t/1996/Q4/orders_96_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath
[path=file:/path/t/1996/Q3/file_96_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath
[path=file:/path/t/1994/Q4/file_94_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath
[path=file:/path/t/1994/Q3/file_94_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath
[path=file:/path/t/1995/Q4/file_95_q4.parquet],
+        ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath
[path=file:/path/t/1995/Q3/file_95_q3.parquet],
+        ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t,
..., columns=[`dir0`, `dir1`]]])
+
+      will be changed to
+
+      00-00    Screen
+      00-01      Project(dir0=[$0], dir1=[$1])
+      00-02        HashAgg(group=[{0, 1}])
+      00-03          Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' },
{ '1996', 'Q2' }, { '1994', 'Q2' },
+        { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994',
'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]])
+     */
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      TableScan scan = call.rel(1);
+
+      String pruningClassName = getClass().getName();
+      logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName);
+      Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
+
+      Object selection = getDrillTable(scan).getSelection();
+      MetadataContext metaContext = null;
+      FileSelection fileSelection = null;
+      if (selection instanceof FormatSelection) {
+        fileSelection = ((FormatSelection) selection).getSelection();
+        metaContext = fileSelection.getMetaContext();
+      }
+
+      PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+      PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+      List<String> fieldNames = scan.getRowType().getFieldNames();
+      List<String> values = Collections.emptyList();
+      List<Integer> indexes = new ArrayList<>(fieldNames.size());
+      for (String field : fieldNames) {
+        int index = descriptor.getPartitionHierarchyIndex(field);
+        indexes.add(index);
+      }
+
+      if (metaContext != null && metaContext.getDirectories() != null) {
+        // Dir metadata cache file exists
+        logger.debug("Using Metadata Directories cache");
+        values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(),
indexes);
+      }
+
+      if (values.isEmpty()) {
+        logger.debug("Not using Metadata Directories cache");
+        int batchIndex = 0;
+        // Outer loop: iterate over a list of batches of PartitionLocations
+        values = new ArrayList<>();
+        for (List<PartitionLocation> partitions : descriptor) {
+          logger.debug("Evaluating file partition pruning for batch {}", batchIndex);
+
+          try {
+            values.addAll(getValues(partitions, indexes));
+          } catch (Exception e) {
+            logger.warn("Exception while trying to prune files.", e);
+            if (totalPruningTime != null) {
+              logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+            }
+
+            // continue without partition pruning
+            return;
+          }
+          batchIndex++;
+        }
+
+        if (values.isEmpty()) {
+          // No changes are required
+          return;
+        }
+      }
+
+      try {
+        // Transform Scan node to DrillValuesRel node
+        List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+        RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+        int i = 0;
+        for (String field : fieldNames) {
+          RelDataType dataType = typeFactory.createTypeWithNullability(
+              typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true);
+          typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+        }
+        RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields);
+        RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+            .values(t, values.toArray())
+            .build();
+
+        RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+        newInput = new DrillValuesRel(
+            newInput.getCluster(),
+            newInput.getRowType(),
+            ((LogicalValues) newInput).getTuples(), traits
+        );
+
+        Aggregate aggregate = call.rel(0);
+        Aggregate newAggregate = aggregate.copy(
+            aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+            newInput,
+            aggregate.indicator,
+            aggregate.getGroupSet(),
+            aggregate.getGroupSets(),
+            aggregate.getAggCallList()
+        );
+        call.transformTo(newAggregate);
+      } catch (Exception e) {
+        logger.warn("Exception while using the pruned partitions.", e);
+      } finally {
+        if (totalPruningTime != null) {
+          logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+        }
+      }
+    }
+
+    private List<String> getValues(String selectionRoot, List<String> directories,
List<Integer> indexes) {
+      List<String> values = new ArrayList<>();
+      for (String dir : directories) {
 
 Review comment:
   Thanks for suggestion but I find it harder to read.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message