drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [03/17] git commit: Refactor trait pull up to common SubsetTransformer. Update Prules to use new class and update FilterPrule to use all instead of best to work with Optiq 0.9.
Date Tue, 29 Jul 2014 15:38:15 GMT
Refactor trait pull up to common SubsetTransformer. Update Prules to use new class and update
FilterPrule to use all instead of best to work with Optiq 0.9.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/913fad85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/913fad85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/913fad85

Branch: refs/heads/master
Commit: 913fad858bbb751cde47b15a2cffda7f4797bcad
Parents: 0cbf6ad
Author: Jacques Nadeau <jacques@apache.org>
Authored: Sat Jul 26 21:36:53 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sat Jul 26 21:41:10 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/FilterPrule.java      |  31 +++---
 .../exec/planner/physical/HashAggPrule.java     |  64 ++++++-----
 .../exec/planner/physical/JoinPruleBase.java    |  82 +++++++-------
 .../exec/planner/physical/ProjectPrule.java     |  39 +++----
 .../exec/planner/physical/StreamAggPrule.java   | 106 +++++++++----------
 .../planner/physical/SubsetTransformer.java     |  69 ++++++++++++
 .../exec/planner/physical/WriterPrule.java      |  34 +++---
 .../java/org/apache/drill/TestTpchExplain.java  |   6 ++
 8 files changed, 257 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
index e72a780..c15c5e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.exec.planner.logical.DrillFilterRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
 
 public class FilterPrule extends Prule {
   public static final RelOptRule INSTANCE = new FilterPrule();
@@ -41,19 +38,25 @@ public class FilterPrule extends Prule {
 
     RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     RelNode convertedInput = convert(input, traits);
-    boolean transform = false;
-    
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      RelNode bestRel = null;
-      if ((bestRel = subset.getBest()) != null) {
-        call.transformTo(new FilterPrel(filter.getCluster(), bestRel.getTraitSet(), convertedInput,
filter.getCondition()));  
-        transform = true;
-      } 
-    }
+
+    boolean transform = new Subset(call).go(filter, convertedInput);
+
     if (!transform) {
       call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(),
convertedInput, filter.getCondition()));
     }
   }
-  
+
+
+  private class Subset extends SubsetTransformer<DrillFilterRel, RuntimeException>
{
+
+    public Subset(RelOptRuleCall call) {
+      super(call);
+    }
+
+    @Override
+    public RelNode convertChild(DrillFilterRel filter, RelNode rel) {
+      return new FilterPrel(filter.getCluster(), rel.getTraitSet(), rel, filter.getCondition());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index d8b2338..4d42f66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -26,8 +26,8 @@ import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
 import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -84,34 +84,8 @@ public class HashAggPrule extends AggPruleBase {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
 
           RelNode convertedInput = convert(input, traits);
+          new TwoPhaseSubset(call, distOnAllKeys).go(aggregate, convertedInput);
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(), 
-                    OperatorPhase.PHASE_1of2);
-
-                HashToRandomExchangePrel exch =
-                    new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
-
-                HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits,
exch,
-                                                         aggregate.getGroupSet(),
-                                                         phase1Agg.getPhase2AggCalls(), 
-                                                         OperatorPhase.PHASE_2of2); 
-                                                    
-
-                call.transformTo(phase2Agg);
-              }
-            }
-          }
         }
       }
     } catch (InvalidRelException e) {
@@ -119,6 +93,40 @@ public class HashAggPrule extends AggPruleBase {
     }
   }
 
+
+  private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException>
{
+    final RelTrait distOnAllKeys;
+
+    public TwoPhaseSubset(RelOptRuleCall call, RelTrait distOnAllKeys) {
+      super(call);
+      this.distOnAllKeys = distOnAllKeys;
+    }
+
+    @Override
+    public RelNode convertChild(DrillAggregateRel aggregate, RelNode input) throws InvalidRelException
{
+
+      RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, input.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+      RelNode newInput = convert(input, traits);
+
+      HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+          aggregate.getGroupSet(),
+          aggregate.getAggCallList(),
+          OperatorPhase.PHASE_1of2);
+
+      HashToRandomExchangePrel exch =
+          new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+              phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+      HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, exch,
+                                               aggregate.getGroupSet(),
+                                               phase1Agg.getPhase2AggCalls(),
+                                               OperatorPhase.PHASE_2of2);
+
+      return phase2Agg;
+    }
+
+  }
+
   private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
                                       RelNode input, RelTraitSet traits) throws InvalidRelException
{
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 336e34c..d6bd711 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -92,39 +92,39 @@ public abstract class JoinPruleBase extends Prule {
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws
InvalidRelException {
-    
-    /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following
options of plan:  
+
+    /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following
options of plan:
      *   1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k)
for right side.
      *   2) Plan2: distributed by l1 for left side, by r1 for right side.
      *   3) Plan3: distributed by l2 for left side, by r2 for right side.
      *   ...
      *      Plan_(k+1): distributed by l_k for left side, by r_k by right side.
-     *   
-     *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey. 
+     *
+     *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
      */
-         
+
     DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
     DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
- 
+
     createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight,
hashLeftPartition, hashRightPartition);
-    
+
     assert (join.getLeftKeys().size() == join.getRightKeys().size());
-    
+
     if (!hashSingleKey)
       return;
-    
+
     int numJoinKeys = join.getLeftKeys().size();
     if (numJoinKeys > 1) {
       for (int i = 0; i< numJoinKeys; i++) {
         hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
         hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
-        
+
         createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight,
hashLeftPartition, hashRightPartition);
       }
     }
   }
 
-      
+
   // Create join plan with both left and right children hash distributed. If the physical
join type
   // is MergeJoin, a collation must be provided for both left and right child and the plan
will contain
   // sort converter if necessary to provide the collation.
@@ -170,9 +170,9 @@ public abstract class JoinPruleBase extends Prule {
   // is MergeJoin, a collation must be provided for both left and right child and the plan
will contain sort converter
   // if necessary to provide the collation.
   protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
-      PhysicalJoinType physicalJoinType,
-      RelNode left, RelNode right,
-      RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException
{
+      final PhysicalJoinType physicalJoinType,
+      final RelNode left, final RelNode right,
+      final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException
{
 
     DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
@@ -183,37 +183,35 @@ public abstract class JoinPruleBase extends Prule {
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
     }
 
-    RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
-    RelNode convertedLeft = convert(left, traitsLeft);
-    RelNode convertedRight = convert(right, traitsRight);
+    final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    final RelNode convertedLeft = convert(left, traitsLeft);
+    final RelNode convertedRight = convert(right, traitsRight);
 
-    traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    new SubsetTransformer<DrillJoinRel, InvalidRelException>(call){
 
-    DrillJoinRelBase newJoin = null;
-
-    if (convertedLeft instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedLeft;
-      for (RelNode rel : subset.getRelList()) {
-        if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-          DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-            traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist);
-          } else {
-            traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-          }
-
-          RelNode newLeft = convert(left, traitsLeft);
-          if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-            newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
-                                       join.getJoinType());
-          } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-            newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
-                                        join.getJoinType());
-          }
-          call.transformTo(newJoin) ;
+      public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException
{
+        DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+        RelTraitSet newTraitsLeft;
+        if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
+        } else {
+          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+        }
+        Character.digit(1, 1);
+        RelNode newLeft = convert(left, newTraitsLeft);
+        if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+          return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
+                                     join.getJoinType());
+        } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+          return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight,
join.getCondition(),
+                                      join.getJoinType());
+        } else{
+          return null;
         }
+
       }
-    }
-  }
 
+    }.go(join, convertedLeft);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 02e6d44..833aaae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -23,19 +23,15 @@ import java.util.Map;
 
 import net.hydromatic.linq4j.Ord;
 
-import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
-import org.eigenbase.rel.ProjectRel;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelCollationImpl;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.rel.RelNode;
-import org.eigenbase.relopt.Convention;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
@@ -64,26 +60,33 @@ public class ProjectPrule extends Prule {
     RelNode convertedInput = convert(input, traits);
 
     Map<Integer, Integer> inToOut = getProjectMap(project);
+    boolean traitPull = new ProjectTraitPull(call, inToOut).go(project, convertedInput);
 
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      for (RelNode rel : subset.getRelList()) {
-        if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-          DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+    if(!traitPull){
+      call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(),
convertedInput, project.getProjects(), project.getRowType()));
+    }
+  }
 
+  private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, RuntimeException>
{
+    final Map<Integer, Integer> inToOut;
 
-          DrillDistributionTrait newDist = convertDist(childDist, inToOut);
-          RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+    public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> inToOut) {
+      super(call);
+      this.inToOut = inToOut;
+    }
 
-          call.transformTo(new ProjectPrel(project.getCluster(), project.getTraitSet().plus(newDist).plus(newCollation).plus(Prel.DRILL_PHYSICAL),
-              rel, project.getProjects(), project.getRowType()));
-        }
-      }
+    @Override
+    public RelNode convertChild(DrillProjectRel project, RelNode rel) throws RuntimeException
{
+      DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+      RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
 
-    } else{
-      call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(),
convertedInput, project.getProjects(), project.getRowType()));
+
+      DrillDistributionTrait newDist = convertDist(childDist, inToOut);
+      RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+      RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation);
+      return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(),
project.getRowType());
     }
+
   }
 
   private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, Map<Integer,
Integer> inToOut) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 0375161..4191184 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 import net.hydromatic.optiq.util.BitSets;
 
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.eigenbase.rel.InvalidRelException;
@@ -56,7 +57,7 @@ public class StreamAggPrule extends AggPruleBase {
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
-    RelCollation collation = getCollation(aggregate);
+    final RelCollation collation = getCollation(aggregate);
     RelTraitSet traits = null;
 
     if (aggregate.containsDistinctCall()) {
@@ -67,44 +68,40 @@ public class StreamAggPrule extends AggPruleBase {
     try {
       if (aggregate.getGroupSet().isEmpty()) {
         DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
-        RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+        final RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
 
         if (create2PhasePlan(call, aggregate)) {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
 
           RelNode convertedInput = convert(input, traits);
+          new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(),
-                    OperatorPhase.PHASE_1of2);
-
-                UnionExchangePrel exch =
-                    new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
-
-                StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), singleDistTrait,
exch,
-                    aggregate.getGroupSet(),
-                    phase1Agg.getPhase2AggCalls(),
-                    OperatorPhase.PHASE_2of2);
-
-                call.transformTo(phase2Agg);
-              }
+            public RelNode convertChild(final DrillAggregateRel join, final RelNode rel)
throws InvalidRelException {
+              DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+              RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+              RelNode newInput = convert(input, traits);
+
+              StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
+                  aggregate.getGroupSet(),
+                  aggregate.getAggCallList(),
+                  OperatorPhase.PHASE_1of2);
+
+              UnionExchangePrel exch =
+                  new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
+
+              return  new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
+                  aggregate.getGroupSet(),
+                  phase1Agg.getPhase2AggCalls(),
+                  OperatorPhase.PHASE_2of2);
             }
-          }
+          }.go(aggregate, convertedInput);
+
         } else {
           createTransformRequest(call, aggregate, input, singleDistTrait);
         }
       } else {
         // hash distribute on all grouping keys
-        DrillDistributionTrait distOnAllKeys =
+        final DrillDistributionTrait distOnAllKeys =
             new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
                                        ImmutableList.copyOf(getDistributionField(aggregate,
true)));
 
@@ -126,39 +123,34 @@ public class StreamAggPrule extends AggPruleBase {
 
         if (create2PhasePlan(call, aggregate)) {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
-
           RelNode convertedInput = convert(input, traits);
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(),
-                    OperatorPhase.PHASE_1of2);
-
-                int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
-                HashToMergeExchangePrel exch =
-                    new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
-                        collation,
-                        numEndPoints);
-
-                StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits,
exch,
-                    aggregate.getGroupSet(),
-                    phase1Agg.getPhase2AggCalls(),
-                    OperatorPhase.PHASE_2of2);
-
-                call.transformTo(phase2Agg);
-              }
+          new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
+
+            public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode
rel) throws InvalidRelException {
+              DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+              RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist);
+              RelNode newInput = convert(input, traits);
+
+              StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
+                  aggregate.getGroupSet(),
+                  aggregate.getAggCallList(),
+                  OperatorPhase.PHASE_1of2);
+
+              int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+              HashToMergeExchangePrel exch =
+                  new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                      phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+                      collation,
+                      numEndPoints);
+
+              return new StreamAggPrel(aggregate.getCluster(), traits, exch,
+                  aggregate.getGroupSet(),
+                  phase1Agg.getPhase2AggCalls(),
+                  OperatorPhase.PHASE_2of2);
             }
-          }
+          }.go(aggregate, convertedInput);
         }
       }
     } catch (InvalidRelException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
new file mode 100644
index 0000000..450b197
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
+
+public abstract class SubsetTransformer<T extends RelNode, E extends Exception> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubsetTransformer.class);
+
+  public abstract RelNode convertChild(T current, RelNode child) throws E;
+
+  private final RelOptRuleCall call;
+
+  public SubsetTransformer(RelOptRuleCall call){
+    this.call = call;
+  }
+
+  public RelTraitSet newTraitSet(RelTrait... traits){
+    RelTraitSet set = call.getPlanner().emptyTraitSet();
+    for(RelTrait t : traits){
+      set = set.plus(t);
+    }
+    return set;
+
+  }
+
+  boolean go(T n, RelNode candidateSet) throws E {
+    if( !(candidateSet instanceof RelSubset) ) return false;
+
+    boolean transform = false;
+
+    for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
+      if (!isDefaultDist(rel)) {
+        RelNode out = convertChild(n, rel);
+        if(out != null){
+          call.transformTo(out);
+          transform = true;
+
+        }
+      }
+    }
+
+    return transform;
+  }
+
+  private boolean isDefaultDist(RelNode n){
+    return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index 42a9984..15d94fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -45,25 +45,29 @@ public class WriterPrule extends Prule{
     final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     final RelNode convertedInput = convert(input, traits);
 
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      for (RelNode rel : subset.getRelList()) {
-        if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
-          DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
-
-          DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(),
-              writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
-              rel, writer.getCreateTableEntry());
-
-          call.transformTo(newWriter);
-        }
-      }
-    } else {
+    if (!new WriteTraitPull(call).go(writer, convertedInput)) {
       DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), convertedInput.getTraitSet(),
           convertedInput, writer.getCreateTableEntry());
 
       call.transformTo(newWriter);
     }
   }
+
+  private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException>
{
+
+    public WriteTraitPull(RelOptRuleCall call) {
+      super(call);
+    }
+
+    @Override
+    public RelNode convertChild(DrillWriterRelBase writer, RelNode rel) throws RuntimeException
{
+      DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+      RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+
+      return new WriterPrel(writer.getCluster(),
+          writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
+          rel, writer.getCreateTableEntry());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
index 7dc5af7..1d67a3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
@@ -133,6 +133,12 @@ public class TestTpchExplain extends BaseTestQuery{
   }
 
   @Test
+  public void tpch19_1() throws Exception{
+    doExplain("queries/tpch/19_1.sql");
+  }
+
+
+  @Test
   public void tpch20() throws Exception{
     doExplain("queries/tpch/20.sql");
   }


Mime
View raw message