Refactor trait pull up to common SubsetTransformer. Update Prules to use new class and update
FilterPrule to use all instead of best to work with Optiq 0.9.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/913fad85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/913fad85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/913fad85
Branch: refs/heads/master
Commit: 913fad858bbb751cde47b15a2cffda7f4797bcad
Parents: 0cbf6ad
Author: Jacques Nadeau <jacques@apache.org>
Authored: Sat Jul 26 21:36:53 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sat Jul 26 21:41:10 2014 -0700
----------------------------------------------------------------------
.../exec/planner/physical/FilterPrule.java | 31 +++---
.../exec/planner/physical/HashAggPrule.java | 64 ++++++-----
.../exec/planner/physical/JoinPruleBase.java | 82 +++++++-------
.../exec/planner/physical/ProjectPrule.java | 39 +++----
.../exec/planner/physical/StreamAggPrule.java | 106 +++++++++----------
.../planner/physical/SubsetTransformer.java | 69 ++++++++++++
.../exec/planner/physical/WriterPrule.java | 34 +++---
.../java/org/apache/drill/TestTpchExplain.java | 6 ++
8 files changed, 257 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
index e72a780..c15c5e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.planner.physical;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
public class FilterPrule extends Prule {
public static final RelOptRule INSTANCE = new FilterPrule();
@@ -41,19 +38,25 @@ public class FilterPrule extends Prule {
RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
RelNode convertedInput = convert(input, traits);
- boolean transform = false;
-
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- RelNode bestRel = null;
- if ((bestRel = subset.getBest()) != null) {
- call.transformTo(new FilterPrel(filter.getCluster(), bestRel.getTraitSet(), convertedInput,
filter.getCondition()));
- transform = true;
- }
- }
+
+ boolean transform = new Subset(call).go(filter, convertedInput);
+
if (!transform) {
call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(),
convertedInput, filter.getCondition()));
}
}
-
+
+
+ private class Subset extends SubsetTransformer<DrillFilterRel, RuntimeException>
{
+
+ public Subset(RelOptRuleCall call) {
+ super(call);
+ }
+
+ @Override
+ public RelNode convertChild(DrillFilterRel filter, RelNode rel) {
+ return new FilterPrel(filter.getCluster(), rel.getTraitSet(), rel, filter.getCondition());
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index d8b2338..4d42f66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -26,8 +26,8 @@ import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -84,34 +84,8 @@ public class HashAggPrule extends AggPruleBase {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
RelNode convertedInput = convert(input, traits);
+ new TwoPhaseSubset(call, distOnAllKeys).go(aggregate, convertedInput);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- HashToRandomExchangePrel exch =
- new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
-
- HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits,
exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
-
- call.transformTo(phase2Agg);
- }
- }
- }
}
}
} catch (InvalidRelException e) {
@@ -119,6 +93,40 @@ public class HashAggPrule extends AggPruleBase {
}
}
+
+ private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException>
{
+ final RelTrait distOnAllKeys;
+
+ public TwoPhaseSubset(RelOptRuleCall call, RelTrait distOnAllKeys) {
+ super(call);
+ this.distOnAllKeys = distOnAllKeys;
+ }
+
+ @Override
+ public RelNode convertChild(DrillAggregateRel aggregate, RelNode input) throws InvalidRelException
{
+
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, input.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+ RelNode newInput = convert(input, traits);
+
+ HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ HashToRandomExchangePrel exch =
+ new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+ HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
+
+ return phase2Agg;
+ }
+
+ }
+
private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
RelNode input, RelTraitSet traits) throws InvalidRelException
{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 336e34c..d6bd711 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -92,39 +92,39 @@ public abstract class JoinPruleBase extends Prule {
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws
InvalidRelException {
-
- /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following
options of plan:
+
+ /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following
options of plan:
* 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k)
for right side.
* 2) Plan2: distributed by l1 for left side, by r1 for right side.
* 3) Plan3: distributed by l2 for left side, by r2 for right side.
* ...
* Plan_(k+1): distributed by l_k for left side, by r_k by right side.
- *
- * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
+ *
+ * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
*/
-
+
DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
-
+
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight,
hashLeftPartition, hashRightPartition);
-
+
assert (join.getLeftKeys().size() == join.getRightKeys().size());
-
+
if (!hashSingleKey)
return;
-
+
int numJoinKeys = join.getLeftKeys().size();
if (numJoinKeys > 1) {
for (int i = 0; i< numJoinKeys; i++) {
hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
-
+
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight,
hashLeftPartition, hashRightPartition);
}
}
}
-
+
// Create join plan with both left and right children hash distributed. If the physical
join type
// is MergeJoin, a collation must be provided for both left and right child and the plan
will contain
// sort converter if necessary to provide the collation.
@@ -170,9 +170,9 @@ public abstract class JoinPruleBase extends Prule {
// is MergeJoin, a collation must be provided for both left and right child and the plan
will contain sort converter
// if necessary to provide the collation.
protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
- PhysicalJoinType physicalJoinType,
- RelNode left, RelNode right,
- RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException
{
+ final PhysicalJoinType physicalJoinType,
+ final RelNode left, final RelNode right,
+ final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException
{
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
@@ -183,37 +183,35 @@ public abstract class JoinPruleBase extends Prule {
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
}
- RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
- RelNode convertedLeft = convert(left, traitsLeft);
- RelNode convertedRight = convert(right, traitsRight);
+ final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ final RelNode convertedLeft = convert(left, traitsLeft);
+ final RelNode convertedRight = convert(right, traitsRight);
- traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ new SubsetTransformer<DrillJoinRel, InvalidRelException>(call){
- DrillJoinRelBase newJoin = null;
-
- if (convertedLeft instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedLeft;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist);
- } else {
- traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- }
-
- RelNode newLeft = convert(left, traitsLeft);
- if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
- newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
- join.getJoinType());
- } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
- join.getJoinType());
- }
- call.transformTo(newJoin) ;
+ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException
{
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet newTraitsLeft;
+ if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
+ } else {
+ newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+ }
+ Character.digit(1, 1);
+ RelNode newLeft = convert(left, newTraitsLeft);
+ if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+ return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
+ join.getJoinType());
+ } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
+ join.getJoinType());
+ } else{
+ return null;
}
+
}
- }
- }
+ }.go(join, convertedLeft);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 02e6d44..833aaae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -23,19 +23,15 @@ import java.util.Map;
import net.hydromatic.linq4j.Ord;
-import org.apache.drill.exec.planner.common.DrillProjectRelBase;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
-import org.eigenbase.rel.ProjectRel;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelCollationImpl;
import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.rel.RelNode;
-import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
@@ -64,26 +60,33 @@ public class ProjectPrule extends Prule {
RelNode convertedInput = convert(input, traits);
Map<Integer, Integer> inToOut = getProjectMap(project);
+ boolean traitPull = new ProjectTraitPull(call, inToOut).go(project, convertedInput);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+ if(!traitPull){
+ call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(),
convertedInput, project.getProjects(), project.getRowType()));
+ }
+ }
+ private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, RuntimeException>
{
+ final Map<Integer, Integer> inToOut;
- DrillDistributionTrait newDist = convertDist(childDist, inToOut);
- RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+ public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> inToOut) {
+ super(call);
+ this.inToOut = inToOut;
+ }
- call.transformTo(new ProjectPrel(project.getCluster(), project.getTraitSet().plus(newDist).plus(newCollation).plus(Prel.DRILL_PHYSICAL),
- rel, project.getProjects(), project.getRowType()));
- }
- }
+ @Override
+ public RelNode convertChild(DrillProjectRel project, RelNode rel) throws RuntimeException
{
+ DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
- } else{
- call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(),
convertedInput, project.getProjects(), project.getRowType()));
+
+ DrillDistributionTrait newDist = convertDist(childDist, inToOut);
+ RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+ RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation);
+ return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(),
project.getRowType());
}
+
}
private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, Map<Integer,
Integer> inToOut) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 0375161..4191184 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.eigenbase.rel.InvalidRelException;
@@ -56,7 +57,7 @@ public class StreamAggPrule extends AggPruleBase {
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
- RelCollation collation = getCollation(aggregate);
+ final RelCollation collation = getCollation(aggregate);
RelTraitSet traits = null;
if (aggregate.containsDistinctCall()) {
@@ -67,44 +68,40 @@ public class StreamAggPrule extends AggPruleBase {
try {
if (aggregate.getGroupSet().isEmpty()) {
DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
- RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+ final RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
if (create2PhasePlan(call, aggregate)) {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
RelNode convertedInput = convert(input, traits);
+ new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- UnionExchangePrel exch =
- new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait,
exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
- call.transformTo(phase2Agg);
- }
+ public RelNode convertChild(final DrillAggregateRel join, final RelNode rel)
throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ UnionExchangePrel exch =
+ new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
+
+ return new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
}
- }
+ }.go(aggregate, convertedInput);
+
} else {
createTransformRequest(call, aggregate, input, singleDistTrait);
}
} else {
// hash distribute on all grouping keys
- DrillDistributionTrait distOnAllKeys =
+ final DrillDistributionTrait distOnAllKeys =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(aggregate,
true)));
@@ -126,39 +123,34 @@ public class StreamAggPrule extends AggPruleBase {
if (create2PhasePlan(call, aggregate)) {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
-
RelNode convertedInput = convert(input, traits);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
- HashToMergeExchangePrel exch =
- new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
- collation,
- numEndPoints);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits,
exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
- call.transformTo(phase2Agg);
- }
+ new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
+
+ public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode
rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+ HashToMergeExchangePrel exch =
+ new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+ collation,
+ numEndPoints);
+
+ return new StreamAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
}
- }
+ }.go(aggregate, convertedInput);
}
}
} catch (InvalidRelException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
new file mode 100644
index 0000000..450b197
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
+
+public abstract class SubsetTransformer<T extends RelNode, E extends Exception> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubsetTransformer.class);
+
+ public abstract RelNode convertChild(T current, RelNode child) throws E;
+
+ private final RelOptRuleCall call;
+
+ public SubsetTransformer(RelOptRuleCall call){
+ this.call = call;
+ }
+
+ public RelTraitSet newTraitSet(RelTrait... traits){
+ RelTraitSet set = call.getPlanner().emptyTraitSet();
+ for(RelTrait t : traits){
+ set = set.plus(t);
+ }
+ return set;
+
+ }
+
+ boolean go(T n, RelNode candidateSet) throws E {
+ if( !(candidateSet instanceof RelSubset) ) return false;
+
+ boolean transform = false;
+
+ for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
+ if (!isDefaultDist(rel)) {
+ RelNode out = convertChild(n, rel);
+ if(out != null){
+ call.transformTo(out);
+ transform = true;
+
+ }
+ }
+ }
+
+ return transform;
+ }
+
+ private boolean isDefaultDist(RelNode n){
+ return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index 42a9984..15d94fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -45,25 +45,29 @@ public class WriterPrule extends Prule{
final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
final RelNode convertedInput = convert(input, traits);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
- DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
-
- DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(),
- writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
- rel, writer.getCreateTableEntry());
-
- call.transformTo(newWriter);
- }
- }
- } else {
+ if (!new WriteTraitPull(call).go(writer, convertedInput)) {
DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), convertedInput.getTraitSet(),
convertedInput, writer.getCreateTableEntry());
call.transformTo(newWriter);
}
}
+
+ private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException>
{
+
+ public WriteTraitPull(RelOptRuleCall call) {
+ super(call);
+ }
+
+ @Override
+ public RelNode convertChild(DrillWriterRelBase writer, RelNode rel) throws RuntimeException
{
+ DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+
+ return new WriterPrel(writer.getCluster(),
+ writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
+ rel, writer.getCreateTableEntry());
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
index 7dc5af7..1d67a3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
@@ -133,6 +133,12 @@ public class TestTpchExplain extends BaseTestQuery{
}
@Test
+ public void tpch19_1() throws Exception{
+ doExplain("queries/tpch/19_1.sql");
+ }
+
+
+ @Test
public void tpch20() throws Exception{
doExplain("queries/tpch/20.sql");
}
|