drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [1/2] git commit: Create 2 phase plan for qualified plain aggregates (no group-by).
Date Fri, 16 May 2014 20:33:59 GMT
Repository: incubator-drill
Updated Branches:
  refs/heads/master cdd2ce905 -> 4ea36c3f1


Create 2 phase plan for qualified plain aggregates (no group-by).


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/62a8bf2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/62a8bf2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/62a8bf2f

Branch: refs/heads/master
Commit: 62a8bf2f582acb28d35b704a12e1b75ecbc8aa9f
Parents: cdd2ce9
Author: Aman Sinha <asinha@maprtech.com>
Authored: Thu May 15 18:19:27 2014 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Thu May 15 18:20:03 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/StreamAggPrule.java   | 36 ++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62a8bf2f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index bccdea5..ff648a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -65,10 +65,40 @@ public class StreamAggPrule extends AggPruleBase {
     try {
       if (aggregate.getGroupSet().isEmpty()) {
         DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(singleDist);
-        createTransformRequest(call, aggregate, input, traits);
-      } else {
+        RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+        
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
{
+                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
             
+                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits,
newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                UnionExchangePrel exch = 
+                    new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
+        
+                StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), singleDistTrait,
exch,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
 
+                call.transformTo(phase2Agg);  
+              }
+            }
+          }
+        } else {        
+          createTransformRequest(call, aggregate, input, singleDistTrait);
+        }
+      } else {
         // hash distribute on all grouping keys
         DrillDistributionTrait distOnAllKeys = 
             new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,



Mime
View raw message