drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [drill] vvysotskyi commented on a change in pull request #1736: DRILL-7064: Leverage the summary metadata for plain COUNT aggregates.
Date Sun, 07 Apr 2019 10:24:00 GMT
vvysotskyi commented on a change in pull request #1736: DRILL-7064: Leverage the summary metadata
for plain COUNT aggregates.
URL: https://github.com/apache/drill/pull/1736#discussion_r272827595
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
 ##########
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.planner.common.CountToDirectScanUtils;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.direct.MetadataDirectGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Set;
+
+/**
+ * <p> This rule is a logical planning counterpart to a corresponding <b>ConvertCountToDirectScanPrule</b>
+ * physical rule
+ * </p>
+ * <p>
+ * This rule will convert <b>" select count(*)  as mycount from table "</b>
+ * or <b>" select count(not-nullable-expr) as mycount from table "</b> into
+ * <pre>
+ *    Project(mycount)
+ *         \
+ *    DirectGroupScan ( PojoRecordReader ( rowCount ))
+ *</pre>
+ * or <b>" select count(column) as mycount from table "</b> into
+ * <pre>
+ *      Project(mycount)
+ *           \
+ *            DirectGroupScan (PojoRecordReader (columnValueCount))
+ *</pre>
+ * Rule can be applied if query contains multiple count expressions.
+ * <b>" select count(column1), count(column2), count(*) from table "</b>
+ * </p>
+ *
+ * <p>
+ * The rule utilizes the Parquet Metadata Cache's summary information to retrieve the total
row count
+ * and the per-column null count.  As such, the rule is only applicable for Parquet tables
and only if the
+ * metadata cache has been created with the summary information.
+ * </p>
+ */
+public class ConvertCountToDirectScanRule extends RelOptRule {
+
+  public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new ConvertCountToDirectScanRule(
+      RelOptHelper.some(Aggregate.class,
+                        RelOptHelper.some(Project.class,
+                            RelOptHelper.any(TableScan.class))), "Agg_on_proj_on_scan:logical");
+
+  public static final RelOptRule AGG_ON_SCAN = new ConvertCountToDirectScanRule(
+      RelOptHelper.some(Aggregate.class,
+                            RelOptHelper.any(TableScan.class)), "Agg_on_scan:logical");
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
+
+  protected ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
+    super(rule, "ConvertCountToDirectScan:logical:" + id);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate agg = (Aggregate) call.rel(0);
+    final TableScan scan = (TableScan) call.rel(call.rels.length - 1);
+    final Project project = call.rels.length == 3 ? (Project) call.rel(1) : null;
+
+    // Qualifying conditions for rule:
+    //    1) There's no GroupBY key,
+    //    2) Agg is not a DISTINCT agg
+    //    3) Additional checks are done further below ..
+    if (agg.getGroupCount() > 0 ||
+        agg.containsDistinctCall()) {
+      return;
+    }
+
+    DrillTable drillTable = DrillRelOptUtil.getDrillTable(scan);
+
+    if (drillTable == null) {
+      logger.debug("Rule does not apply since an eligible drill table instance was not found.");
+      return;
+    }
+
+    Object selection = drillTable.getSelection();
+
+    if (!(selection instanceof FormatSelection)) {
+      logger.debug("Rule does not apply since only Parquet file format is eligible.");
+      return;
+    }
+
+    //  Rule is applicable only if the statistics for row count and null count are available
from the metadata,
+    FormatSelection formatSelection = (FormatSelection) selection;
+    Pair<Boolean, Metadata_V4.MetadataSummary> status = checkMetadataForScanStats(drillTable,
formatSelection);
+
+    if (!status.getLeft()) {
+      logger.debug("Rule does not apply since MetadataSummary metadata was not found.");
+      return;
+    }
+
+    PlannerSettings settings = call.getPlanner().getContext().unwrap(PlannerSettings.class);
+    Metadata_V4.MetadataSummary metadataSummary = status.getRight();
+    Map<String, Long> result = collectCounts(settings, metadataSummary, agg, scan,
project);
+    logger.trace("Calculated the following aggregate counts: ", result);
+
+    // if counts could not be determined, rule won't be applied
+    if (result.isEmpty()) {
+      logger.debug("Rule does not apply since one or more COUNTs could not be determined
from metadata.");
+      return;
+    }
+
+    List<Path> fileList =
+            ImmutableList.of(Metadata.getSummaryFileName(formatSelection.getSelection().getSelectionRoot()));
+
+    final RelDataType scanRowType = CountToDirectScanUtils.constructDataType(agg, result.keySet());
+
+    final DynamicPojoRecordReader<Long> reader = new DynamicPojoRecordReader<>(
+        CountToDirectScanUtils.buildSchema(scanRowType.getFieldNames()),
+        Collections.singletonList((List<Long>) new ArrayList<>(result.values())));
+
+    final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT,
1, 1, scanRowType.getFieldCount());
+    final MetadataDirectGroupScan directScan = new MetadataDirectGroupScan(reader, fileList,
scanStats, true);
+
+    final DrillDirectScanRel newScan = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+      directScan, scanRowType);
+
+    final DrillProjectRel newProject = new DrillProjectRel(agg.getCluster(), agg.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+      newScan, CountToDirectScanUtils.prepareFieldExpressions(scanRowType), agg.getRowType());
+
+    call.transformTo(newProject);
+  }
+
+  private Pair<Boolean, Metadata_V4.MetadataSummary> checkMetadataForScanStats(DrillTable
drillTable, FormatSelection formatSelection) {
+
+    // Currently only support metadata rowcount stats for Parquet tables
+    FormatPluginConfig formatConfig = formatSelection.getFormat();
+    if (!((formatConfig instanceof ParquetFormatConfig)
+      || ((formatConfig instanceof NamedFormatPluginConfig)
+      && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+    }
+
+    FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
+    DrillFileSystem fs = null;
+    try {
+       fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+    } catch (IOException e) {
+      logger.warn("Unable to create the file system object for retrieving statistics from
metadata cache file ", e);
+      return new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+    }
+
+    // check if the cacheFileRoot has been set: this is needed because after directory pruning,
the
+    // cacheFileRoot could have been changed and not be the same as the original selectionRoot
+    Path selectionRoot = formatSelection.getSelection().getCacheFileRoot() != null ?
+            formatSelection.getSelection().getCacheFileRoot() :
+            formatSelection.getSelection().getSelectionRoot();
+    Metadata_V4.MetadataSummary metadataSummary = Metadata.getSummary(fs, selectionRoot,
false, ParquetReaderConfig.builder().build());
+
+    return metadataSummary != null ? new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(true,
metadataSummary) :
+      new ImmutablePair<Boolean, Metadata_V4.MetadataSummary>(false, null);
+  }
+
+  /**
+   * Collects counts for each aggregation call by using the metadata summary information
+   * Will return empty result map if was not able to determine count for at least one aggregation
call.
+   *
+   * For each aggregate call will determine if count can be calculated. Collects counts only
for COUNT function.
+   *   1. First, we get the total row count from the metadata summary.
+   *   2. For COUNT(*) and COUNT(<non null column>) and COUNT(<implicit column>),
the count = total row count
+   *   3. For COUNT(nullable column), count = (total row count - column's null count)
+   *   4. Also count can not be calculated for parition columns.
+   *
+   * @param settings planner options
+   * @param metadataSummary metadata summary containing row counts and column counts
+   * @param agg aggregate relational expression
+   * @param scan scan relational expression
+   * @param project project relational expression
+   * @return result map where key is count column name, value is count value
+   */
+  private Map<String, Long> collectCounts(PlannerSettings settings, Metadata_V4.MetadataSummary
metadataSummary,
+                                          Aggregate agg, TableScan scan, Project project)
{
+    final Set<String> implicitColumnsNames = ColumnExplorer.initImplicitFileColumns(settings.getOptions()).keySet();
+    final long totalRecordCount = metadataSummary.getTotalRowCount();
+    final LinkedHashMap<String, Long> result = new LinkedHashMap<>();
+
+    for (int i = 0; i < agg.getAggCallList().size(); i++) {
+      AggregateCall aggCall = agg.getAggCallList().get(i);
+      long cnt;
+
+      // rule can be applied only for count function, return empty counts
+      if (!"count".equalsIgnoreCase(aggCall.getAggregation().getName()) ) {
+        return ImmutableMap.of();
+      }
+
+      if (CountToDirectScanUtils.containsStarOrNotNullInput(aggCall, agg)) {
+        cnt = totalRecordCount;
+
+      } else if (aggCall.getArgList().size() == 1) {
+        // count(columnName) ==> Agg ( Scan )) ==> columnValueCount
+        int index = aggCall.getArgList().get(0);
+
+        if (project != null) {
+          // project in the middle of Agg and Scan : Only when input of AggCall is a RexInputRef
in Project, we find the index of Scan's field.
+          // For instance,
+          // Agg - count($0)
+          //  \
+          //  Proj - Exp={$1}
+          //    \
+          //   Scan (col1, col2).
+          // return count of "col2" in Scan's metadata, if found.
+          if (!(project.getProjects().get(index) instanceof RexInputRef)) {
+            return ImmutableMap.of(); // do not apply for all other cases.
+          }
+
+          index = ((RexInputRef) project.getProjects().get(index)).getIndex();
+        }
+
+        String columnName = scan.getRowType().getFieldNames().get(index).toLowerCase();
+
+        // for implicit column count will be the same as total record count
+        if (implicitColumnsNames.contains(columnName)) {
+          cnt = totalRecordCount;
+        } else {
+          SchemaPath simplePath = SchemaPath.getSimplePath(columnName);
+
+          if (ColumnExplorer.isPartitionColumn(settings.getOptions(), simplePath)) {
+            return ImmutableMap.of();
+          }
+
+          Metadata_V4.ColumnTypeMetadata_v4 columnMetadata = metadataSummary.getColumnTypeInfo(new
Metadata_V4.ColumnTypeMetadata_v4.Key(simplePath));
+
+         if (columnMetadata == null) {
+            // if column stats is not available don't apply this rule, return empty counts
+            return ImmutableMap.of();
+          } else {
+           // count of a nullable column = (total row count - column's null count)
+           cnt = totalRecordCount - columnMetadata.totalNullCount;
 
 Review comment:
   It is mostly a question to the previous commit. For V3 and older versions were possible
cases when statistics were unavailable, but its `ColumnTypeMetadata` was still present. For
such cases, `nullsCout` was set to `-1` (`GroupScan.NO_COLUMN_STATS`).
   Should this check be added here, or v4 does not store `ColumnTypeMetadata` for such columns?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message