Repository: drill
Updated Branches:
refs/heads/master 7bfcb40a0 -> 0a2518d7c
DRILL-4363: Row count based pruning for parquet table used in Limit n query.
Modify two existint unit testcase:
1) TestPartitionFilter.testMainQueryFalseCondition(): rowCount pruning applied after false
condition is transformed into LIMIT 0
2) TestLimitWithExchanges.testPushLimitPastUnionExchange(): modify the testcase to use Json
source, so that it does not mix with PushLimitIntoScanRule.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0a2518d7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0a2518d7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0a2518d7
Branch: refs/heads/master
Commit: 0a2518d7cf01a92a27a82e29edac5424bedf31d5
Parents: 7bfcb40
Author: Jinfeng Ni <jni@apache.org>
Authored: Tue Feb 2 15:31:47 2016 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Thu Feb 11 15:01:15 2016 -0800
----------------------------------------------------------------------
.../exec/physical/base/AbstractGroupScan.java | 21 ++++
.../drill/exec/physical/base/GroupScan.java | 12 +++
.../logical/DrillPushLimitToScanRule.java | 108 +++++++++++++++++++
.../exec/planner/logical/DrillRuleSets.java | 4 +-
.../exec/store/parquet/ParquetGroupScan.java | 55 +++++++++-
.../org/apache/drill/TestPartitionFilter.java | 3 +-
.../impl/limit/TestLimitWithExchanges.java | 43 ++++++--
.../exec/store/TestAffinityCalculator.java | 4 +-
8 files changed, 236 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 1277ec4..b6b1a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -128,4 +128,25 @@ public abstract class AbstractGroupScan extends AbstractBase implements
GroupSca
public List<SchemaPath> getPartitionColumns() {
return Lists.newArrayList();
}
+
+ /**
+ * Default is not to support limit pushdown.
+ * @return
+ */
+ @Override
+ @JsonIgnore
+ public boolean supportsLimitPushdown() {
+ return false;
+ }
+
+ /**
+ * By default, return null to indicate rowcount based prune is not supported.
+ * Each groupscan subclass should override, if it supports rowcount based prune.
+ */
+ @Override
+ @JsonIgnore
+ public GroupScan applyLimit(long maxRecords) {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 946c7e8..041f10a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -98,4 +98,16 @@ public interface GroupScan extends Scan, HasAffinity{
@JsonIgnore
public List<SchemaPath> getPartitionColumns();
+ /**
+ * Whether or not this GroupScan supports limit pushdown
+ */
+ public boolean supportsLimitPushdown();
+
+ /**
+ * Apply rowcount based prune for "LIMIT n" query.
+ * @param maxRecords : the number of rows requested from group scan.
+ * @return a new instance of group scan if the prune is successful.
+ * null when either if row-based prune is not supported, or if prune is not successful.
+ */
+ public GroupScan applyLimit(long maxRecords);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
new file mode 100644
index 0000000..9f762f0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DrillPushLimitToScanRule extends RelOptRule {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class);
+
+ private DrillPushLimitToScanRule(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ public static DrillPushLimitToScanRule LIMIT_ON_SCAN = new DrillPushLimitToScanRule(
+ RelOptHelper.some(DrillLimitRel.class, RelOptHelper.any(DrillScanRel.class)), "DrillPushLimitToScanRule_LimitOnScan")
{
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ DrillScanRel scanRel = call.rel(1);
+ return scanRel.getGroupScan().supportsLimitPushdown(); // For now only applies to Parquet.
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ DrillLimitRel limitRel = call.rel(0);
+ DrillScanRel scanRel = call.rel(1);
+ doOnMatch(call, limitRel, scanRel, null);
+ }
+ };
+
+ public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new DrillPushLimitToScanRule(
+ RelOptHelper.some(DrillLimitRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
"DrillPushLimitToScanRule_LimitOnProject") {
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ DrillScanRel scanRel = call.rel(2);
+ return scanRel.getGroupScan().supportsLimitPushdown(); // For now only applies to Parquet.
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ DrillLimitRel limitRel = call.rel(0);
+ DrillProjectRel projectRel = call.rel(1);
+ DrillScanRel scanRel = call.rel(2);
+ doOnMatch(call, limitRel, scanRel, projectRel);
+ }
+ };
+
+
+ protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, DrillScanRel scanRel,
DrillProjectRel projectRel){
+ try {
+ final int rowCountRequested = (int) limitRel.getRows();
+
+ final GroupScan newGroupScan = scanRel.getGroupScan().applyLimit(rowCountRequested);
+
+ if (newGroupScan == null ) {
+ return;
+ }
+
+ DrillScanRel newScanRel = new DrillScanRel(scanRel.getCluster(),
+ scanRel.getTraitSet(),
+ scanRel.getTable(),
+ newGroupScan,
+ scanRel.getRowType(),
+ scanRel.getColumns(),
+ scanRel.partitionFilterPushdown());
+
+ final RelNode newLimit;
+ if (projectRel != null) {
+ final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of((RelNode)newScanRel));
+ newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode)newProject));
+ } else {
+ newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode)newScanRel));
+ }
+
+ call.transformTo(newLimit);
+ logger.debug("Converted to a new DrillScanRel" + newScanRel.getGroupScan());
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index d9609d2..230cee2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -207,7 +207,9 @@ public class DrillRuleSets {
PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
- ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
+ ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext),
+ DrillPushLimitToScanRule.LIMIT_ON_SCAN,
+ DrillPushLimitToScanRule.LIMIT_ON_PROJECT
)
.build();
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index dfe9084..30e0846 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.calcite.util.Pair;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
@@ -485,12 +486,14 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private EndpointByteMap byteMap;
private int rowGroupIndex;
private String root;
+ private long rowCount; // rowCount = -1 indicates to include all rows.
@JsonCreator
public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
- @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex)
{
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex,
long rowCount) {
super(path, start, length);
this.rowGroupIndex = rowGroupIndex;
+ this.rowCount = rowCount;
}
public RowGroupReadEntry getRowGroupReadEntry() {
@@ -519,6 +522,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
public void setEndpointByteMap(EndpointByteMap byteMap) {
this.byteMap = byteMap;
}
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
}
private void init() throws IOException {
@@ -582,7 +590,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
int rgIndex = 0;
for (RowGroupMetadata rg : file.getRowGroups()) {
RowGroupInfo rowGroupInfo =
- new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex);
+ new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex, rg.getRowCount());
EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
for (String host : rg.getHostAffinity().keySet()) {
if (hostEndpointMap.containsKey(host)) {
@@ -791,6 +799,49 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
@Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ @Override
+ public GroupScan applyLimit(long maxRecords) {
+ Preconditions.checkArgument(rowGroupInfos.size() >= 0);
+
+ maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1
rowGroup.
+ // further optimization : minimize # of files chosen, or the affinity of files chosen.
+ long count = 0;
+ int index = 0;
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+ if (count < maxRecords) {
+ count += rowGroupInfo.getRowCount();
+ index ++;
+ } else {
+ break;
+ }
+ }
+
+ Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique.
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) {
+ fileNames.add(rowGroupInfo.getPath());
+ }
+
+ if (fileNames.size() == fileSet.size() ) {
+ // There is no reduction of rowGroups. Return the original groupScan.
+ logger.debug("applyLimit() does not apply!");
+ return null;
+ }
+
+ try {
+ FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames),
getSelectionRoot());
+ logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size());
+ return this.clone(newSelection);
+ } catch (IOException e) {
+ logger.warn("Could not apply rowcount based prune due to Exception : {}", e);
+ return null;
+ }
+ }
+
+ @Override
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
return true;
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index e5d6603..a2d101e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -257,7 +257,8 @@ public class TestPartitionFilter extends PlanTestBase {
public void testMainQueryFalseCondition() throws Exception {
String root = FileUtils.getResourceAsFile("/multilevel/parquet").toURI().toString();
String query = String.format("select * from (select dir0, o_custkey from dfs_test.`%s`
where dir0='1994') t where 1 = 0", root);
- testExcludeFilter(query, 4, "Filter", 0);
+ // the 1 = 0 becomes limit 0, which will require to read only one parquet file, in stead
of 4 for year '1994'.
+ testExcludeFilter(query, 1, "Filter", 0);
}
@Test // see DRILL-2712
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
index 789a536..18f181b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
@@ -27,6 +27,8 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestLimitWithExchanges extends BaseTestQuery {
+ final String WORKING_PATH = TestTools.getWorkingPath();
+ final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
@Test
public void testLimitWithExchanges() throws Exception{
@@ -36,23 +38,20 @@ public class TestLimitWithExchanges extends BaseTestQuery {
@Test
public void testPushLimitPastUnionExchange() throws Exception {
// Push limit past through UnionExchange.
- final String WORKING_PATH = TestTools.getWorkingPath();
- final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-
try {
test("alter session set `planner.slice_target` = 1");
final String[] excludedPlan = {};
// case 1. single table query.
- final String sql = String.format("select * from dfs_test.`%s/tpchmulti/region` limit
1 offset 2", TEST_RES_PATH);
+ final String sql = String.format("select * from dfs_test.`%s/multilevel/json` limit
1 offset 2", TEST_RES_PATH);
final String[] expectedPlan ={"(?s)Limit\\(offset=\\[2\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Scan"};
testLimitHelper(sql, expectedPlan, excludedPlan, 1);
- final String sql2 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit
1 offset 0", TEST_RES_PATH);
+ final String sql2 = String.format("select * from dfs_test.`%s/multilevel/json` limit
1 offset 0", TEST_RES_PATH);
final String[] expectedPlan2 = {"(?s)Limit\\(offset=\\[0\\], fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
testLimitHelper(sql2, expectedPlan2, excludedPlan, 1);
- final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/region` limit
1", TEST_RES_PATH);
+ final String sql3 = String.format("select * from dfs_test.`%s/multilevel/json` limit
1", TEST_RES_PATH);
final String[] expectedPlan3 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
testLimitHelper(sql3, expectedPlan3, excludedPlan, 1);
@@ -79,9 +78,6 @@ public class TestLimitWithExchanges extends BaseTestQuery {
@Test
public void testNegPushLimitPastUnionExchange() throws Exception {
// Negative case: should not push limit past through UnionExchange.
- final String WORKING_PATH = TestTools.getWorkingPath();
- final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-
try {
test("alter session set `planner.slice_target` = 1");
final String[] expectedPlan ={};
@@ -100,6 +96,35 @@ public class TestLimitWithExchanges extends BaseTestQuery {
}
}
+ @Test
+ public void testLimitImpactExchange() throws Exception {
+ try {
+ test("alter session set `planner.slice_target` = 5" );
+
+ // nation has 3 files, total 25 rows.
+ // Given slice_target = 5, if # of rows to fetch is < 5 : do NOT insert Exchange,
and the query should run in single fragment.
+ // if # of row to fetch is >= 5: do insert exchange, and
query should run in multiple fragments.
+ final String sql = String.format("select * from dfs_test.`%s/tpchmulti/nation` limit
2", TEST_RES_PATH); // Test Limit_On_Scan rule.
+ final String sql2 = String.format("select n_nationkey + 1000 from dfs_test.`%s/tpchmulti/nation`
limit 2", TEST_RES_PATH); // Test Limit_On_Project rule.
+ final String [] expectedPlan = {};
+ final String [] excludedPlan = {"UnionExchange"};
+
+ testLimitHelper(sql, expectedPlan, excludedPlan, 2);
+ testLimitHelper(sql2, expectedPlan, excludedPlan, 2);
+
+ final String sql3 = String.format("select * from dfs_test.`%s/tpchmulti/nation` limit
10", TEST_RES_PATH); // Test Limit_On_Scan rule.
+ final String sql4 = String.format("select n_nationkey + 1000 from dfs_test.`%s/tpchmulti/nation`
limit 10", TEST_RES_PATH); // Test Limit_On_Project rule.
+
+ final String [] expectedPlan2 = {"UnionExchange"};
+ final String [] excludedPlan2 = {};
+
+ testLimitHelper(sql3, expectedPlan2, excludedPlan2, 10);
+ testLimitHelper(sql4, expectedPlan2, excludedPlan2, 10);
+ } finally {
+ test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+ }
+ }
+
private void testLimitHelper(final String sql, final String[] expectedPlan, final String[]
excludedPattern, int expectedRecordCount) throws Exception {
// Validate the plan
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPattern);
http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
index 0999218..dadb850 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
@@ -72,7 +72,9 @@ public class TestAffinityCalculator extends ExecTest {
rowGroups.clear();
for (int i = 0; i < numberOfRowGroups; i++) {
- rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, (long)i*rowGroupSize, (long)rowGroupSize,
i));
+ // buildRowGroups method seems not be used at all. Pass -1 as rowCount.
+ // Maybe remove this method completely ?
+ rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, (long)i*rowGroupSize, (long)rowGroupSize,
i, -1));
}
}
|