drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [08/14] git commit: DRILL-728: Show SelectionVectorRemover and single mode changes in text plan output.
Date Thu, 15 May 2014 20:05:40 GMT
DRILL-728: Show SelectionVectorRemover and single mode changes in text plan output.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d468f6d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d468f6d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d468f6d6

Branch: refs/heads/master
Commit: d468f6d6e6f15417b56f87bbdfbfdcc182bf0d4e
Parents: c7746ed
Author: Jacques Nadeau <jacques@apache.org>
Authored: Tue May 13 17:26:05 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu May 15 09:20:45 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/AggPruleBase.java     | 23 +++---
 .../planner/physical/BroadcastExchangePrel.java | 32 ++++++---
 .../physical/ConvertCountToDirectScan.java      |  2 +-
 .../drill/exec/planner/physical/FilterPrel.java | 28 +++++++-
 .../exec/planner/physical/FilterPrule.java      |  8 +--
 .../exec/planner/physical/HashAggPrel.java      | 49 +++++++++----
 .../exec/planner/physical/HashAggPrule.java     | 38 +++++-----
 .../exec/planner/physical/HashJoinPrel.java     | 43 +++++++-----
 .../physical/HashToMergeExchangePrel.java       | 40 ++++++-----
 .../physical/HashToRandomExchangePrel.java      | 37 +++++-----
 .../exec/planner/physical/JoinPruleBase.java    | 74 ++++++++++----------
 .../drill/exec/planner/physical/LimitPrel.java  | 22 ++++++
 .../drill/exec/planner/physical/LimitPrule.java | 10 +--
 .../exec/planner/physical/MergeJoinPrel.java    | 37 ++++++----
 .../physical/OrderedPartitionExchangePrel.java  | 20 ++++--
 .../drill/exec/planner/physical/Prel.java       | 16 +++--
 .../drill/exec/planner/physical/PrelUtil.java   | 30 ++++++--
 .../exec/planner/physical/PrelVisitor.java      | 26 +++++++
 .../exec/planner/physical/ProjectPrel.java      | 25 ++++++-
 .../exec/planner/physical/ProjectPrule.java     |  2 +-
 .../drill/exec/planner/physical/Prule.java      | 50 +++++++++++++
 .../exec/planner/physical/PushLimitToTopN.java  | 16 ++---
 .../drill/exec/planner/physical/ScanPrel.java   | 44 +++++++++---
 .../drill/exec/planner/physical/ScanPrule.java  |  2 +-
 .../drill/exec/planner/physical/ScreenPrel.java | 24 ++++++-
 .../exec/planner/physical/ScreenPrule.java      | 10 +--
 .../physical/SelectionVectorPrelVisitor.java    | 56 +++++++++++++++
 .../physical/SelectionVectorRemoverPrel.java    | 44 ++++++++++++
 .../physical/SingleMergeExchangePrel.java       | 39 ++++++-----
 .../drill/exec/planner/physical/SinglePrel.java | 50 +++++++++++++
 .../drill/exec/planner/physical/SortPrel.java   | 33 ++++++---
 .../drill/exec/planner/physical/SortPrule.java  | 35 +++++----
 .../exec/planner/physical/StreamAggPrel.java    | 28 ++++++--
 .../drill/exec/planner/physical/TopNPrel.java   | 30 +++++---
 .../planner/physical/UnionExchangePrel.java     | 35 ++++-----
 .../drill/exec/planner/physical/WriterPrel.java | 22 ++++++
 .../exec/planner/physical/WriterPrule.java      |  2 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |  3 +-
 .../apache/drill/exec/record/BatchSchema.java   | 11 ++-
 .../org/apache/drill/TestTpchSingleMode.java    |  1 -
 40 files changed, 805 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 3bdcc2e..563458e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -31,11 +31,11 @@ import org.eigenbase.relopt.RelOptRuleOperand;
 
 import com.google.common.collect.Lists;
 
-// abstract base class for the aggregation physical rules  
-public abstract class AggPruleBase extends RelOptRule {
+// abstract base class for the aggregation physical rules
+public abstract class AggPruleBase extends Prule {
 
   protected AggPruleBase(RelOptRuleOperand operand, String description) {
-    super(operand, description);   
+    super(operand, description);
   }
 
   protected List<DistributionField> getDistributionField(DrillAggregateRel rel, boolean allFields) {
@@ -48,22 +48,23 @@ public abstract class AggPruleBase extends RelOptRule {
       if (!allFields && groupByFields.size() == 1) {
         // if we are only interested in 1 grouping field, pick the first one for now..
         // but once we have num distinct values (NDV) statistics, we should pick the one
-        // with highest NDV. 
+        // with highest NDV.
         break;
       }
-    }    
-    
+    }
+
     return groupByFields;
   }
-  
+
   // Create 2 phase aggr plan for aggregates such as SUM, MIN, MAX
-  // If any of the aggregate functions are not one of these, then we 
-  // currently won't generate a 2 phase plan. 
+  // If any of the aggregate functions are not one of these, then we
+  // currently won't generate a 2 phase plan.
   protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
-    if (! PrelUtil.getPlannerSettings(call.getPlanner()).isMultiPhaseAggEnabled()) {
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()) {
       return false;
     }
-    
+
     for (AggregateCall aggCall : aggregate.getAggCallList()) {
       String name = aggCall.getAggregation().getName();
       if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index 292c1ba..a0704b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -19,6 +19,8 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -34,8 +36,8 @@ import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class BroadcastExchangePrel extends SingleRel implements Prel {
-  
+public class BroadcastExchangePrel extends SinglePrel{
+
   public BroadcastExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
     super(cluster, traitSet, input);
     assert input.getConvention() == Prel.DRILL_PHYSICAL;
@@ -43,16 +45,16 @@ public class BroadcastExchangePrel extends SingleRel implements Prel {
 
   /**
    * In a BroadcastExchange, each sender is sending data to N receivers (for costing
-   * purposes we assume it is also sending to itself). 
+   * purposes we assume it is also sending to itself).
    */
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
 
     RelNode child = this.getChild();
-   
+
     double inputRows = RelMetadataQuery.getRowCount(child);
     int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
     double cpuCost = DrillCostBase.SVR_CPU_COST * inputRows ;
@@ -62,22 +64,32 @@ public class BroadcastExchangePrel extends SingleRel implements Prel {
   }
 
   @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     return new BroadcastExchangePrel(getCluster(), traitSet, sole(inputs));
   }
-  
+
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getChild();
-    
+
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
-    
+
     //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
     if (!childPOP.getSVMode().equals(SelectionVectorMode.NONE)) {
       childPOP = new SelectionVectorRemover(childPOP);
     }
 
     BroadcastExchange g = new BroadcastExchange(childPOP);
-    return g;    
+    return g;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
index 6ec295f..21956dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
@@ -70,7 +70,7 @@ import com.google.common.collect.Lists;
  * scan the whole parquet files.
  */
 
-public class ConvertCountToDirectScan extends RelOptRule {
+public class ConvertCountToDirectScan extends Prule {
 
   public static final RelOptRule AGG_ON_PROJ_ON_SCAN = new ConvertCountToDirectScan(
       RelOptHelper.some(DrillAggregateRel.class,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 99a0cdc..8420e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.planner.common.DrillFilterRelBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.RelOptCluster;
@@ -45,7 +47,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     return new FilterPrel(getCluster(), traitSet, sole(inputs), getCondition());
   }
-  
+
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
 
@@ -53,11 +55,31 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
 
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
-    //Currently, Filter accepts "NONE", SV2, SV4.
-
     Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f);
 
     return p;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.ALL;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    SelectionVectorMode m = ((Prel) this.getChild()).getEncoding();
+    if(m == SelectionVectorMode.FOUR_BYTE) return SelectionVectorMode.FOUR_BYTE;
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
index ee1a022..1c1c611 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
@@ -29,7 +29,7 @@ import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.relopt.volcano.RelSubset;
 
-public class FilterPrule extends RelOptRule {
+public class FilterPrule extends Prule {
   public static final RelOptRule INSTANCE = new FilterPrule();
 
   private FilterPrule() {
@@ -43,16 +43,16 @@ public class FilterPrule extends RelOptRule {
 
     RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     RelNode convertedInput = convert(input, traits);
-    
+
     if (convertedInput instanceof RelSubset) {
       RelSubset subset = (RelSubset) convertedInput;
       for (RelNode rel : subset.getRelList()) {
         if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
           call.transformTo(new FilterPrel(filter.getCluster(), rel.getTraitSet(), convertedInput, filter.getCondition()));
         }
-      }      
+      }
     } else{
-      call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(), convertedInput, filter.getCondition()));        
+      call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(), convertedInput, filter.getCondition()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index 066bed0..b2378be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 
 import net.hydromatic.linq4j.Ord;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.AggregateRelBase;
 import org.eigenbase.rel.InvalidRelException;
@@ -67,7 +69,7 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
     RelNode child = this.getChild();
     double inputRows = RelMetadataQuery.getRowCount(child);
@@ -80,22 +82,22 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
     cpuCost += DrillCostBase.FUNC_CPU_COST * numAggrFields * inputRows;
     double diskIOCost = 0; // assume in-memory for now until we enforce operator-level memory constraints
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */);    
+    return costFactory.makeCost(inputRows, cpuCost, diskIOCost, 0 /* network cost */);
   }
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    
+
     final List<String> childFields = getChild().getRowType().getFieldNames();
     final List<String> fields = getRowType().getFieldNames();
     List<NamedExpression> keys = Lists.newArrayList();
     List<NamedExpression> exprs = Lists.newArrayList();
-    
+
     for (int group : BitSets.toIter(groupSet)) {
       FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
       keys.add(new NamedExpression(fr, fr));
     }
-    
+
     for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
       FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
       LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
@@ -103,25 +105,46 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
     }
 
     Prel child = (Prel) this.getChild();
-    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), 
-        keys.toArray(new NamedExpression[keys.size()]), 
-        exprs.toArray(new NamedExpression[exprs.size()]), 
+    HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
+        keys.toArray(new NamedExpression[keys.size()]),
+        exprs.toArray(new NamedExpression[exprs.size()]),
         1.0f);
-    
-    return g;    
+
+    return g;
 
   }
-  
+
   private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
     List<LogicalExpression> args = Lists.newArrayList();
     for(Integer i : call.getArgList()){
       args.add(new FieldReference(fn.get(i)));
     }
-    
+
     // for count(1).
     if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
     LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
     return expr;
   }
- 
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 859941b..9395a1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -44,18 +44,18 @@ public class HashAggPrule extends AggPruleBase {
   public boolean matches(RelOptRuleCall call) {
     return PrelUtil.getPlannerSettings(call.getPlanner()).isHashAggEnabled();
   }
-  
+
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = call.rel(1);
 
     if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
-      // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or 
-      // if there are no grouping keys 
+      // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
+      // if there are no grouping keys
       return;
     }
-    
+
     RelTraitSet traits = null;
 
     try {
@@ -65,25 +65,25 @@ public class HashAggPrule extends AggPruleBase {
         createTransformRequest(call, aggregate, input, traits);
       } else {
         // hash distribute on all grouping keys
-        DrillDistributionTrait distOnAllKeys = 
-            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+        DrillDistributionTrait distOnAllKeys =
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
                                        ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
-    
+
         traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
         createTransformRequest(call, aggregate, input, traits);
 
         // hash distribute on single grouping key
-        DrillDistributionTrait distOnOneKey = 
-            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+        DrillDistributionTrait distOnOneKey =
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
                                        ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
-    
+
         traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
         createTransformRequest(call, aggregate, input, traits);
-        
+
         if (create2PhasePlan(call, aggregate)) {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
 
-          RelNode convertedInput = convert(input, traits);  
+          RelNode convertedInput = convert(input, traits);
 
           if (convertedInput instanceof RelSubset) {
             RelSubset subset = (RelSubset) convertedInput;
@@ -105,25 +105,25 @@ public class HashAggPrule extends AggPruleBase {
                                                          aggregate.getGroupSet(),
                                                          aggregate.getAggCallList());
 
-                call.transformTo(phase2Agg);                   
+                call.transformTo(phase2Agg);
               }
             }
-          }    
+          }
         }
-      } 
+      }
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());
     }
   }
 
-  private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate, 
+  private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
                                       RelNode input, RelTraitSet traits) throws InvalidRelException {
 
-    final RelNode convertedInput = convert(input, traits);
-    
+    final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
+
     HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
                                          aggregate.getAggCallList());
-      
+
     call.transformTo(newAgg);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 7c68c8d..1a528d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
@@ -67,26 +68,26 @@ public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
     double probeRowCount = RelMetadataQuery.getRowCount(this.getLeft());
     double buildRowCount = RelMetadataQuery.getRowCount(this.getRight());
-    
+
     // cpu cost of hashing the join keys for the build side
     double cpuCostBuild = DrillCostBase.HASH_CPU_COST * getRightKeys().size() * buildRowCount;
     // cpu cost of hashing the join keys for the probe side
     double cpuCostProbe = DrillCostBase.HASH_CPU_COST * getLeftKeys().size() * probeRowCount;
-      
+
     // cpu cost of evaluating each leftkey=rightkey join condition
     double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
-    
+
     double cpuCost = joinConditionCost * (buildRowCount + probeRowCount) + cpuCostBuild + cpuCostProbe;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0);    
+    return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0);
   }
 
-  @Override  
-  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {    
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     final List<String> fields = getRowType().getFieldNames();
     assert isUnique(fields);
     final int leftCount = left.getRowType().getFieldCount();
@@ -96,12 +97,6 @@ public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
     PhysicalOperator leftPop = implementInput(creator, 0, left);
     PhysicalOperator rightPop = implementInput(creator, leftCount, right);
 
-    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
-    leftPop = PrelUtil.removeSvIfRequired(leftPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
-
-    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
-    rightPop = PrelUtil.removeSvIfRequired(rightPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
-
     JoinRelType jtype = this.getJoinType();
 
     List<JoinCondition> conditions = Lists.newArrayList();
@@ -150,9 +145,6 @@ public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
   private PhysicalOperator rename(PhysicalPlanCreator creator, PhysicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
     List<NamedExpression> exprs = Lists.newArrayList();
 
-    //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover
-    inputOp = PrelUtil.removeSvIfRequired(inputOp, SelectionVectorMode.NONE);
-
     for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
       exprs.add(new NamedExpression(new FieldReference(pair.left), new FieldReference(pair.right)));
     }
@@ -162,5 +154,24 @@ public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
     return proj;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getLeft(), getRight());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
index 252d0ef..45bc390 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -38,13 +39,13 @@ import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.reltype.RelDataTypeField;
 
 
-public class HashToMergeExchangePrel extends SingleRel implements Prel {
+public class HashToMergeExchangePrel extends SinglePrel {
 
   private final List<DistributionField> distFields;
   private int numEndPoints = 0;
   private final RelCollation collation ;
-  
-  public HashToMergeExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, 
+
+  public HashToMergeExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
                                  List<DistributionField> fields,
                                  RelCollation collation,
                                  int numEndPoints) {
@@ -59,48 +60,49 @@ public class HashToMergeExchangePrel extends SingleRel implements Prel {
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
-    }    
+      return super.computeSelfCost(planner).multiplyBy(.1);
+    }
     RelNode child = this.getChild();
     double inputRows = RelMetadataQuery.getRowCount(child);
 
     int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
     double hashCpuCost = DrillCostBase.HASH_CPU_COST * inputRows * distFields.size();
     double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows;
-    double mergeCpuCost = DrillCostBase.COMPARE_CPU_COST * inputRows * (Math.log(numEndPoints)/Math.log(2));    
+    double mergeCpuCost = DrillCostBase.COMPARE_CPU_COST * inputRows * (Math.log(numEndPoints)/Math.log(2));
     double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(inputRows, hashCpuCost + svrCpuCost + mergeCpuCost, 0, networkCost);    
+    return costFactory.makeCost(inputRows, hashCpuCost + svrCpuCost + mergeCpuCost, 0, networkCost);
   }
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new HashToMergeExchangePrel(getCluster(), traitSet, sole(inputs), distFields, 
+    return new HashToMergeExchangePrel(getCluster(), traitSet, sole(inputs), distFields,
         this.collation, numEndPoints);
   }
-  
+
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getChild();
-    
+
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
-    
+
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
-    //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
-    childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
-    
-    HashToMergeExchange g = new HashToMergeExchange(childPOP, 
+    HashToMergeExchange g = new HashToMergeExchange(childPOP,
         PrelUtil.getHashExpression(this.distFields, getChild().getRowType()),
         PrelUtil.getOrdering(this.collation, getChild().getRowType()));
-    return g;    
+    return g;
   }
-  
+
   public List<DistributionField> getDistFields() {
     return this.distFields;
   }
-  
+
   public RelCollation getCollation() {
     return this.collation;
   }
-  
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index 1b1a757..d582684 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -41,10 +41,10 @@ import org.eigenbase.reltype.RelDataTypeField;
 import org.eigenbase.rex.RexNode;
 
 
-public class HashToRandomExchangePrel extends SingleRel implements Prel {
+public class HashToRandomExchangePrel extends SinglePrel {
 
   private final List<DistributionField> fields;
-  
+
   public HashToRandomExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, List<DistributionField> fields) {
     super(cluster, traitSet, input);
     this.fields = fields;
@@ -52,33 +52,33 @@ public class HashToRandomExchangePrel extends SingleRel implements Prel {
   }
 
   /**
-   * HashToRandomExchange processes M input rows and hash partitions them 
-   * based on computing a hash value on the distribution fields. 
-   * If there are N nodes (endpoints), we can assume for costing purposes 
-   * on average each sender will send M/N rows to 1 destination endpoint.  
+   * HashToRandomExchange processes M input rows and hash partitions them
+   * based on computing a hash value on the distribution fields.
+   * If there are N nodes (endpoints), we can assume for costing purposes
+   * on average each sender will send M/N rows to 1 destination endpoint.
    * (See DrillCostBase for symbol notations)
-   * C =  CPU cost of hashing k fields of M/N rows 
-   *      + CPU cost of SV remover for M/N rows 
-   *      + Network cost of sending M/N rows to 1 destination. 
-   * So, C = (h * k * M/N) + (s * M/N) + (w * M/N) 
+   * C =  CPU cost of hashing k fields of M/N rows
+   *      + CPU cost of SV remover for M/N rows
+   *      + Network cost of sending M/N rows to 1 destination.
+   * So, C = (h * k * M/N) + (s * M/N) + (w * M/N)
    * Total cost = N * C
    */
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
-    
+
     RelNode child = this.getChild();
     double inputRows = RelMetadataQuery.getRowCount(child);
 
     int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
-    
+
     double hashCpuCost = DrillCostBase.HASH_CPU_COST * inputRows * fields.size();
     double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows;
     double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(inputRows, hashCpuCost + svrCpuCost, 0, networkCost);   
+    return costFactory.makeCost(inputRows, hashCpuCost + svrCpuCost, 0, networkCost);
   }
 
   @Override
@@ -93,9 +93,6 @@ public class HashToRandomExchangePrel extends SingleRel implements Prel {
 
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
-    //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
-    childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
-
     HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType()));
     return g;
   }
@@ -113,4 +110,10 @@ public class HashToRandomExchangePrel extends SingleRel implements Prel {
     return pw;
   }
 
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 1b30d68..406eb65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -39,13 +39,13 @@ import org.eigenbase.rex.RexNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-// abstract base class for the join physical rules  
-public abstract class JoinPruleBase extends RelOptRule {
+// abstract base class for the join physical rules
+public abstract class JoinPruleBase extends Prule {
 
   protected static enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN};
-  
+
   protected JoinPruleBase(RelOptRuleOperand operand, String description) {
-    super(operand, description);   
+    super(operand, description);
   }
 
   protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right) {
@@ -53,7 +53,7 @@ public abstract class JoinPruleBase extends RelOptRule {
       // this indicates a cartesian join which is not supported by existing rules
       return false;
     }
-    
+
     List<Integer> leftKeys = Lists.newArrayList();
     List<Integer> rightKeys = Lists.newArrayList() ;
     RexNode remaining = RelOptUtil.splitJoinCondition(left, right, join.getCondition(), leftKeys, rightKeys);
@@ -63,45 +63,45 @@ public abstract class JoinPruleBase extends RelOptRule {
     }
     return true;
   }
-  
+
   protected List<DistributionField> getDistributionField(List<Integer> keys) {
     List<DistributionField> distFields = Lists.newArrayList();
 
     for (int key : keys) {
       distFields.add(new DistributionField(key));
     }
-     
+
     return distFields;
-  }  
-  
+  }
+
   protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {
     if (! PrelUtil.getPlannerSettings(planner).isBroadcastJoinEnabled()) {
       return false;
     }
 
     double estimatedRightRowCount = RelMetadataQuery.getRowCount(right);
-    if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold() 
+    if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
         && ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
         ) {
       return true;
     }
     return false;
   }
-  
-  // Create join plan with both left and right children hash distributed. If the physical join type 
-  // is MergeJoin, a collation must be provided for both left and right child and the plan will contain 
-  // sort converter if necessary to provide the collation. 
+
+  // Create join plan with both left and right children hash distributed. If the physical join type
+  // is MergeJoin, a collation must be provided for both left and right child and the plan will contain
+  // sort converter if necessary to provide the collation.
   protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
-      PhysicalJoinType physicalJoinType, 
-      RelNode left, RelNode right, 
+      PhysicalJoinType physicalJoinType,
+      RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException {
- 
+
     DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
     DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
     RelTraitSet traitsLeft = null;
     RelTraitSet traitsRight = null;
-    
-    if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { 
+
+    if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
       assert collationLeft != null && collationRight != null;
       traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition);
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition);
@@ -112,28 +112,28 @@ public abstract class JoinPruleBase extends RelOptRule {
 
     final RelNode convertedLeft = convert(left, traitsLeft);
     final RelNode convertedRight = convert(right, traitsRight);
-    
+
     DrillJoinRelBase newJoin = null;
-    
-    if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { 
-      newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, 
+
+    if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+      newJoin = new HashJoinPrel(join.getCluster(), traitsLeft,
                                  convertedLeft, convertedRight, join.getCondition(),
                                  join.getJoinType());
-      
-    } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { 
-      newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, 
+
+    } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+      newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
                                   convertedLeft, convertedRight, join.getCondition(),
                                   join.getJoinType());
     }
-    call.transformTo(newJoin);    
+    call.transformTo(newJoin);
   }
-  
-  // Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type 
+
+  // Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type
   // is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
-  // if necessary to provide the collation. 
+  // if necessary to provide the collation.
   protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
-      PhysicalJoinType physicalJoinType, 
-      RelNode left, RelNode right, 
+      PhysicalJoinType physicalJoinType,
+      RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException {
 
     DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
@@ -144,15 +144,15 @@ public abstract class JoinPruleBase extends RelOptRule {
     } else {
       traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
     }
-    
+
     RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
-    RelNode convertedLeft = convert(left, traitsLeft);  
+    RelNode convertedLeft = convert(left, traitsLeft);
     RelNode convertedRight = convert(right, traitsRight);
 
     traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
 
     DrillJoinRelBase newJoin = null;
-    
+
     if (convertedLeft instanceof RelSubset) {
       RelSubset subset = (RelSubset) convertedLeft;
       for (RelNode rel : subset.getRelList()) {
@@ -163,7 +163,7 @@ public abstract class JoinPruleBase extends RelOptRule {
           } else {
             traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
           }
-          
+
           RelNode newLeft = convert(left, traitsLeft);
           if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
             newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
@@ -177,5 +177,5 @@ public abstract class JoinPruleBase extends RelOptRule {
       }
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 7c8d767..5986fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.logical.data.LogicalOperator;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptCluster;
@@ -67,6 +69,26 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
     return limit;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.NONE_AND_TWO;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
 //  @Override
 //  public LogicalOperator implement(DrillImplementor implementor) {
 //    LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrule.java
index dbea251..aad2976 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrule.java
@@ -27,19 +27,19 @@ import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class LimitPrule extends RelOptRule{
+public class LimitPrule extends Prule{
   public static final RelOptRule INSTANCE = new LimitPrule();
 
-  
+
   public LimitPrule() {
-    super(RelOptHelper.any(DrillLimitRel.class, DrillRel.DRILL_LOGICAL), "Prel.LimitPrule");    
+    super(RelOptHelper.any(DrillLimitRel.class, DrillRel.DRILL_LOGICAL), "Prel.LimitPrule");
   }
-  
+
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillLimitRel limit = (DrillLimitRel) call.rel(0);
     final RelNode input = limit.getChild();
-    
+
     final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
     final RelNode convertedInput = convert(input, traits);
     LimitPrel newLimit = new LimitPrel(limit.getCluster(), limit.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, limit.getOffset(), limit.getFetch());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index 2dbf159..fe03c40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
@@ -78,7 +79,7 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
     double leftRowCount = RelMetadataQuery.getRowCount(this.getLeft());
     double rightRowCount = RelMetadataQuery.getRowCount(this.getRight());
@@ -86,11 +87,11 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
     double joinConditionCost = DrillCostBase.COMPARE_CPU_COST * this.getLeftKeys().size();
     double cpuCost = joinConditionCost * (leftRowCount + rightRowCount);
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(leftRowCount + rightRowCount, cpuCost, 0, 0);    
+    return costFactory.makeCost(leftRowCount + rightRowCount, cpuCost, 0, 0);
   }
 
-  @Override  
-  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {    
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     final List<String> fields = getRowType().getFieldNames();
     assert isUnique(fields);
     final int leftCount = left.getRowType().getFieldCount();
@@ -100,12 +101,6 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
     PhysicalOperator leftPop = implementInput(creator, 0, left);
     PhysicalOperator rightPop = implementInput(creator, leftCount, right);
 
-    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
-    leftPop = PrelUtil.removeSvIfRequired(leftPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
-
-    //Currently, only accepts "NONE" or "SV2". For other, requires SelectionVectorRemover
-    rightPop = PrelUtil.removeSvIfRequired(rightPop, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
-
     JoinRelType jtype = this.getJoinType();
 
     List<JoinCondition> conditions = Lists.newArrayList();
@@ -154,9 +149,6 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
   private PhysicalOperator rename(PhysicalPlanCreator creator, PhysicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
     List<NamedExpression> exprs = Lists.newArrayList();
 
-    //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover
-    inputOp = PrelUtil.removeSvIfRequired(inputOp, SelectionVectorMode.NONE);
-
     for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
       exprs.add(new NamedExpression(new FieldReference(pair.left), new FieldReference(pair.right)));
     }
@@ -166,5 +158,24 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
     return proj;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getLeft(), getRight());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.NONE_AND_TWO;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedPartitionExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedPartitionExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedPartitionExchangePrel.java
index 2347db1..f551a27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedPartitionExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/OrderedPartitionExchangePrel.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.SingleRel;
 import org.eigenbase.rel.metadata.RelMetadataQuery;
@@ -31,7 +33,7 @@ import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class OrderedPartitionExchangePrel extends SingleRel implements Prel {
+public class OrderedPartitionExchangePrel extends SinglePrel {
 
   public OrderedPartitionExchangePrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
     super(cluster, traitSet, input);
@@ -41,27 +43,31 @@ public class OrderedPartitionExchangePrel extends SingleRel implements Prel {
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
-    }    
+      return super.computeSelfCost(planner).multiplyBy(.1);
+    }
     RelNode child = this.getChild();
     double inputRows = RelMetadataQuery.getRowCount(child);
 
     int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
-    
+
     double rangePartitionCpuCost = DrillCostBase.RANGE_PARTITION_CPU_COST * inputRows;
     double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows;
     double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(inputRows, rangePartitionCpuCost + svrCpuCost, 0, networkCost);  
+    return costFactory.makeCost(inputRows, rangePartitionCpuCost + svrCpuCost, 0, networkCost);
   }
 
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     return new OrderedPartitionExchangePrel(getCluster(), traitSet, sole(inputs));
   }
-  
+
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     throw new IOException(this.getClass().getSimpleName() + " not supported yet!");
   }
-  
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
index e0dace1..a6c6b7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
@@ -21,14 +21,20 @@ import java.io.IOException;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.relopt.Convention;
 
-public interface Prel extends DrillRelNode {
+public interface Prel extends DrillRelNode, Iterable<Prel>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Prel.class);
-  
+
   final static Convention DRILL_PHYSICAL = new Convention.Impl("PHYSICAL", Prel.class);
-  
+
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException;
-  
-    
+
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E;
+
+  public SelectionVectorMode[] getSupportedEncodings();
+  public SelectionVectorMode getEncoding();
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index 9ca9fbb..1cd480c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -35,8 +37,11 @@ import org.apache.drill.exec.planner.physical.DrillDistributionTrait.Distributio
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptCluster;
 import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.rex.RexCall;
 import org.eigenbase.rex.RexInputRef;
@@ -84,20 +89,24 @@ public class PrelUtil {
     return func;
   }
 
+  public static Iterator<Prel> iter(RelNode... nodes){
+    return (Iterator<Prel>) (Object) Arrays.asList(nodes).iterator();
+  }
+
   public static PlannerSettings getSettings(RelOptCluster cluster){
     return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);
   }
-  
+
   public static PlannerSettings getPlannerSettings(RelOptPlanner planner) {
     return planner.getFrameworkContext().unwrap(PlannerSettings.class);
   }
 
-  public static PhysicalOperator removeSvIfRequired(PhysicalOperator child, SelectionVectorMode... allowed){
-    SelectionVectorMode current = child.getSVMode();
+  public static Prel removeSvIfRequired(Prel prel, SelectionVectorMode... allowed){
+    SelectionVectorMode current = prel.getEncoding();
     for(SelectionVectorMode m : allowed){
-      if(current == m) return child;
+      if(current == m) return prel;
     }
-    return new SelectionVectorRemover(child);
+    return new SelectionVectorRemoverPrel(prel);
   }
 
   public static List<SchemaPath> getColumns(RelDataType rowType, List<RexNode> projects) {
@@ -176,4 +185,15 @@ public class PrelUtil {
 
   }
 
+  public static RelTraitSet fixTraits(RelOptRuleCall call, RelTraitSet set){
+    return fixTraits(call.getPlanner(), set);
+  }
+
+  public static RelTraitSet fixTraits(RelOptPlanner cluster, RelTraitSet set){
+    if(getPlannerSettings(cluster).isSingleMode()){
+      return set.replace(DrillDistributionTrait.ANY);
+    }else{
+      return set;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelVisitor.java
new file mode 100644
index 0000000..ab22b97
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelVisitor.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+
+public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrelVisitor.class);
+
+  public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index c7342aa..f76251f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -54,13 +56,30 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
 
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
-    //Currently, Project only accepts "NONE". For other, requires SelectionVectorRemover
-    childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
-
     Project p = new Project(this.getProjectExpressions(new DrillParseContext()),  childPOP);
 
     return p;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 358c485..02e6d44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -48,7 +48,7 @@ import org.eigenbase.sql.SqlKind;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-public class ProjectPrule extends RelOptRule {
+public class ProjectPrule extends Prule {
   public static final RelOptRule INSTANCE = new ProjectPrule();
 
   private ProjectPrule() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prule.java
new file mode 100644
index 0000000..c0d6516
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelOptRuleOperand;
+import org.eigenbase.relopt.RelTraitSet;
+
+public abstract class Prule extends RelOptRule{
+  public Prule(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  public Prule(RelOptRuleOperand operand) {
+    super(operand);
+  }
+
+
+  public static RelNode convert(RelNode rel, RelTraitSet toTraits){
+
+    PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
+    if(settings.isSingleMode()){
+      toTraits = toTraits.replace(DrillDistributionTrait.ANY);
+    }
+
+    return RelOptRule.convert(rel, toTraits);
+  }
+
+  public static boolean isSingleMode(RelOptRuleCall call){
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isSingleMode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
index 4c5cf33..6d318ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
@@ -23,8 +23,8 @@ import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.rex.RexLiteral;
 
-public class PushLimitToTopN  extends RelOptRule{
-  
+public class PushLimitToTopN  extends Prule{
+
   public static final RelOptRule INSTANCE = new PushLimitToTopN();
 
   private PushLimitToTopN() {
@@ -36,16 +36,16 @@ public class PushLimitToTopN  extends RelOptRule{
     final LimitPrel limit = (LimitPrel) call.rel(0);
     final SingleMergeExchangePrel smex = (SingleMergeExchangePrel) call.rel(1);
     final SortPrel sort = (SortPrel) call.rel(2);
-    
+
     // First offset to include into results (inclusive). Null implies it is starting from offset 0
     int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0;
     int fetch = limit.getFetch() != null?  Math.max(0, RexLiteral.intValue(limit.getFetch())) : 0;
-        
-    final TopNPrel topN = new TopNPrel(limit.getCluster(), sort.getTraitSet(), sort.getChild(), offset + fetch, sort.getCollation()); 
-    final LimitPrel newLimit = new LimitPrel(limit.getCluster(), limit.getTraitSet(), 
-        new SingleMergeExchangePrel(smex.getCluster(), smex.getTraitSet(), topN, sort.getCollation()), 
+
+    final TopNPrel topN = new TopNPrel(limit.getCluster(), sort.getTraitSet(), sort.getChild(), offset + fetch, sort.getCollation());
+    final LimitPrel newLimit = new LimitPrel(limit.getCluster(), limit.getTraitSet(),
+        new SingleMergeExchangePrel(smex.getCluster(), smex.getTraitSet(), topN, sort.getCollation()),
         limit.getOffset(), limit.getFetch());
-    
+
     call.transformTo(newLimit);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 672dd27..74cd7a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.OperatorCost;
@@ -98,29 +100,49 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
   public double getRows() {
     return this.groupScan.getSize().getRecordCount();
   }
-  
+
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     Size scanSize = this.groupScan.getSize();
-    int columnCount = this.getRowType().getFieldCount();   
-    
+    int columnCount = this.getRowType().getFieldCount();
+
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
       OperatorCost scanCost = this.groupScan.getCost();
       return planner.getCostFactory().makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), scanCost.getDisk());
     }
-    
+
     // double rowCount = RelMetadataQuery.getRowCount(this);
     double rowCount = scanSize.getRecordCount();
-    
-    double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. 
+
+    double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count.
     // Even though scan is reading from disk, in the currently generated plans all plans will
-    // need to read the same amount of data, so keeping the disk io cost 0 is ok for now.  
-    // In the future we might consider alternative scans that go against projections or 
+    // need to read the same amount of data, so keeping the disk io cost 0 is ok for now.
+    // In the future we might consider alternative scans that go against projections or
     // different compression schemes etc that affect the amount of data read. Such alternatives
-    // would affect both cpu and io cost. 
+    // would affect both cpu and io cost.
     double ioCost = 0;
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);   
+    return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);
   }
-  
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index 9e3bac0..606bbcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -27,7 +27,7 @@ import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class ScanPrule extends RelOptRule{
+public class ScanPrule extends Prule{
   public static final RelOptRule INSTANCE = new ScanPrule();
 
   public ScanPrule() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index c3b452f..36bf796 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -49,11 +50,28 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel {
 
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
-    //Currently, Screen only accepts "NONE". For other, requires SelectionVectorRemover
-    childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
-
     Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint());
     return s;
   }
 
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
index 5b549d5..9cb8e15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrule.java
@@ -26,19 +26,19 @@ import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class ScreenPrule extends RelOptRule{
+public class ScreenPrule extends Prule{
   public static final RelOptRule INSTANCE = new ScreenPrule();
 
-  
+
   public ScreenPrule() {
-    super(RelOptHelper.some(DrillScreenRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.ScreenPrule");    
+    super(RelOptHelper.some(DrillScreenRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.ScreenPrule");
   }
-  
+
   @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillScreenRelBase screen = (DrillScreenRelBase) call.rel(0);
     final RelNode input = call.rel(1);
-    
+
     final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
     final RelNode convertedInput = convert(input, traits);
     DrillScreenRelBase newScreen = new ScreenPrel(screen.getCluster(), screen.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorPrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorPrelVisitor.java
new file mode 100644
index 0000000..aa7e91c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorPrelVisitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+
+import com.google.common.collect.Lists;
+
+
+public class SelectionVectorPrelVisitor implements PrelVisitor<Prel, Void, RuntimeException>{
+
+  private static SelectionVectorPrelVisitor INSTANCE = new SelectionVectorPrelVisitor();
+
+  public static Prel addSelectionRemoversWhereNecessary(Prel prel){
+    return prel.accept(INSTANCE, null);
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    SelectionVectorMode[] encodings = prel.getSupportedEncodings();
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      child = child.accept(this, null);
+      children.add(convert(encodings, child));
+    }
+
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  private Prel convert(SelectionVectorMode[] encodings, Prel prel){
+    for(SelectionVectorMode m : encodings){
+      if(prel.getEncoding() == m) return prel;
+    }
+    return new SelectionVectorRemoverPrel(prel);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
new file mode 100644
index 0000000..63cdcaa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+public class SelectionVectorRemoverPrel extends SinglePrel{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorRemoverPrel.class);
+
+  public SelectionVectorRemoverPrel(Prel child){
+    super(child.getCluster(), child.getTraitSet(), child);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    return new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator));
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
index 29ed511..9b93058 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
@@ -38,7 +38,7 @@ import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 
-public class SingleMergeExchangePrel extends SingleRel implements Prel {
+public class SingleMergeExchangePrel extends SinglePrel {
 
   private final RelCollation collation ;
 
@@ -48,32 +48,32 @@ public class SingleMergeExchangePrel extends SingleRel implements Prel {
     assert input.getConvention() == Prel.DRILL_PHYSICAL;
   }
 
-  /**    
-   * A SingleMergeExchange processes a total of M rows coming from N 
-   * sorted input streams (from N senders) and merges them into a single 
+  /**
+   * A SingleMergeExchange processes a total of M rows coming from N
+   * sorted input streams (from N senders) and merges them into a single
    * output sorted stream. For costing purposes we can assume each sender
-   * is sending M/N rows to a single receiver.   
+   * is sending M/N rows to a single receiver.
    * (See DrillCostBase for symbol notations)
-   * C =  CPU cost of SV remover for M/N rows 
-   *     + Network cost of sending M/N rows to 1 destination. 
-   * So, C = (s * M/N) + (w * M/N) 
+   * C =  CPU cost of SV remover for M/N rows
+   *     + Network cost of sending M/N rows to 1 destination.
+   * So, C = (s * M/N) + (w * M/N)
    * Cost of merging M rows coming from N senders = (M log2 N) * c
-   * Total cost = N * C + (M log2 N) * c 
-   */  
+   * Total cost = N * C + (M log2 N) * c
+   */
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
-      return super.computeSelfCost(planner).multiplyBy(.1); 
+      return super.computeSelfCost(planner).multiplyBy(.1);
     }
     RelNode child = this.getChild();
     double inputRows = RelMetadataQuery.getRowCount(child);
-    int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;    
+    int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
     double svrCpuCost = DrillCostBase.SVR_CPU_COST * inputRows;
     double networkCost = DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth;
-    int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints(); 
+    int numEndPoints = PrelUtil.getSettings(getCluster()).numEndPoints();
     double mergeCpuCost = DrillCostBase.COMPARE_CPU_COST * inputRows * (Math.log(numEndPoints)/Math.log(2));
     DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
-    return costFactory.makeCost(inputRows, svrCpuCost + mergeCpuCost, 0, networkCost);   
+    return costFactory.makeCost(inputRows, svrCpuCost + mergeCpuCost, 0, networkCost);
   }
 
   @Override
@@ -88,9 +88,6 @@ public class SingleMergeExchangePrel extends SingleRel implements Prel {
 
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
-    //Currently, only accepts "NONE". For other, requires SelectionVectorRemover
-    childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
-
     SingleMergeExchange g = new SingleMergeExchange(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()));
     return g;
   }
@@ -108,4 +105,12 @@ public class SingleMergeExchangePrel extends SingleRel implements Prel {
     return pw;
   }
 
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d468f6d6/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
new file mode 100644
index 0000000..e47dc7f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+public abstract class SinglePrel extends SingleRel implements Prel{
+
+  public SinglePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+    super(cluster, traits, child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+}


Mime
View raw message