drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [02/24] git commit: wip to support op numbering throughout exec.
Date Thu, 22 May 2014 01:14:39 GMT
wip to support op numbering throughout exec.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ebf3d340
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ebf3d340
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ebf3d340

Branch: refs/heads/diagnostics2
Commit: ebf3d340497afeceb93d7e7c8211c5eebfce9ebf
Parents: e14a38c
Author: Jacques Nadeau <jacques@apache.org>
Authored: Fri May 16 10:52:15 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon May 19 09:11:22 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/fragment/Materializer.java     | 27 +++++++-----
 .../planner/physical/BroadcastExchangePrel.java |  2 +
 .../drill/exec/planner/physical/FilterPrel.java |  2 +-
 .../exec/planner/physical/HashAggPrel.java      |  2 +
 .../exec/planner/physical/HashJoinPrel.java     |  1 +
 .../physical/HashToMergeExchangePrel.java       |  1 +
 .../physical/HashToRandomExchangePrel.java      |  2 +
 .../drill/exec/planner/physical/LimitPrel.java  |  1 +
 .../exec/planner/physical/MergeJoinPrel.java    |  1 +
 .../planner/physical/PhysicalPlanCreator.java   | 14 +++++-
 .../exec/planner/physical/ProjectPrel.java      |  1 +
 .../drill/exec/planner/physical/ScanPrel.java   |  1 +
 .../drill/exec/planner/physical/ScreenPrel.java |  2 +
 .../physical/SelectionVectorRemoverPrel.java    |  5 ++-
 .../physical/SingleMergeExchangePrel.java       |  2 +
 .../drill/exec/planner/physical/SortPrel.java   |  1 +
 .../exec/planner/physical/StreamAggPrel.java    |  2 +-
 .../drill/exec/planner/physical/TopNPrel.java   |  1 +
 .../planner/physical/UnionExchangePrel.java     |  1 +
 .../drill/exec/planner/physical/WriterPrel.java |  5 ++-
 .../planner/physical/explain/PrelSequencer.java | 46 +++++++++++++++-----
 .../planner/sql/handlers/DefaultSqlHandler.java |  3 +-
 .../apache/drill/exec/TestOpSerialization.java  | 10 +++--
 23 files changed, 102 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 87078a2..ef9146a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -30,21 +30,23 @@ import com.google.common.collect.Lists;
 public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode,
ExecutionSetupException>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class);
 
-  
+
   @Override
   public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws
ExecutionSetupException {
     iNode.addAllocation(exchange);
     if(exchange == iNode.getNode().getSendingExchange()){
-      
+
       // this is a sending exchange.
       PhysicalOperator child = exchange.getChild().accept(this, iNode);
       PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(),
child);
+      materializedSender.setOperatorId(0);
 //      logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender,
child);
       return materializedSender;
-      
+
     }else{
       // receiving exchange.
       PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
+      materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId());
 //      logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
       return materializedReceiver;
     }
@@ -52,7 +54,9 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator,
Mate
 
   @Override
   public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode)
throws ExecutionSetupException {
-    return groupScan.getSpecificScan(iNode.getMinorFragmentId());
+    PhysicalOperator child = groupScan.getSpecificScan(iNode.getMinorFragmentId());
+    child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId());
+    return child;
   }
 
   @Override
@@ -67,9 +71,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator,
Mate
     PhysicalOperator child = store.getChild().accept(this, iNode);
 
     iNode.addAllocation(store);
-    
+
     try {
       PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
+      o.setOperatorId(Short.MAX_VALUE & store.getOperatorId());
 //      logger.debug("New materialized store node {} with child {}", o, child);
       return o;
     } catch (PhysicalOperatorSetupException e) {
@@ -85,13 +90,15 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator,
Mate
     for(PhysicalOperator child : op){
       children.add(child.accept(this, iNode));
     }
-    return op.getNewWithChildren(children);
+    PhysicalOperator newOp = op.getNewWithChildren(children);
+    newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
+    return newOp;
   }
-  
+
   public static class IndexedFragmentNode{
     final Wrapper info;
     final int minorFragmentId;
-    
+
     public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
       super();
       this.info = info;
@@ -113,7 +120,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator,
Mate
     public void addAllocation(PhysicalOperator pop) {
       info.addAllocation(pop);
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index e0f3ee1..8b1c720 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -89,6 +89,8 @@ public class BroadcastExchangePrel extends ExchangePrel{
     }
 
     BroadcastExchange g = new BroadcastExchange(childPOP);
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 8420e08..9632911 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -56,7 +56,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f);
-
+    p.setOperatorId(creator.getOperatorId(this));
     return p;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..6377e35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -110,6 +110,8 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
         exprs.toArray(new NamedExpression[exprs.size()]),
         1.0f);
 
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 1a528d5..87da31e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -106,6 +106,7 @@ public class HashJoinPrel  extends DrillJoinRelBase implements Prel {
     }
 
     HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
+    hjoin.setOperatorId(creator.getOperatorId(this));
 
     return hjoin;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
index 262fd8c..0539a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
@@ -90,6 +90,7 @@ public class HashToMergeExchangePrel extends ExchangePrel {
     HashToMergeExchange g = new HashToMergeExchange(childPOP,
         PrelUtil.getHashExpression(this.distFields, getChild().getRowType()),
         PrelUtil.getOrdering(this.collation, getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index ec9ed79..a5699cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -94,6 +94,8 @@ public class HashToRandomExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields,
getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5986fde..794593a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -65,6 +65,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
     Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
 
     Limit limit = new Limit(childPOP, first, last);
+    limit.setOperatorId(creator.getOperatorId(this));
 
     return limit;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index fe03c40..400c6a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -110,6 +110,7 @@ public class MergeJoinPrel  extends DrillJoinRelBase implements Prel {
     }
 
     MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype);
+    mjoin.setOperatorId(creator.getOperatorId(this));
 
     return mjoin;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
index 9ac07d3..f4189e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
@@ -27,20 +28,24 @@ import org.apache.drill.common.logical.PlanProperties.PlanType;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
 
 import com.google.common.collect.Lists;
+import com.google.hive12.common.collect.Maps;
 
 
 public class PhysicalPlanCreator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanCreator.class);
 
+  private final Map<Prel, OpId> opIdMap;
+
   private List<PhysicalOperator> popList;
   private final QueryContext context;
   PhysicalPlan plan = null;
 
-  public PhysicalPlanCreator(QueryContext context) {
+  public PhysicalPlanCreator(QueryContext context, Map<Prel, OpId> opIdMap) {
     this.context = context;
+    this.opIdMap = opIdMap;
     popList = Lists.newArrayList();
   }
 
@@ -48,6 +53,11 @@ public class PhysicalPlanCreator {
     return context;
   }
 
+  public int getOperatorId(Prel prel){
+    OpId id = opIdMap.get(prel);
+    return id.getAsSingleInt();
+  }
+
   public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) {
 
     if (plan != null && !forceRebuild) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 1aa34d3..70dca25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -57,6 +57,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Project p = new Project(this.getProjectExpressions(new DrillParseContext()),  childPOP);
+    p.setOperatorId(creator.getOperatorId(this));
 
     return p;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 74cd7a9..8461e24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -73,6 +73,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
       throws IOException {
+    groupScan.setOperatorId(creator.getOperatorId(this));
     return groupScan;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index 36bf796..d02ed44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -51,6 +51,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint());
+    s.setOperatorId(creator.getOperatorId(this));
+
     return s;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index 63cdcaa..fd07749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -32,7 +32,10 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException
{
-    return new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator));
+    SelectionVectorRemover r =  new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator));
+    r.setOperatorId(creator.getOperatorId(this));
+    return r;
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
index 05d6e89..99d99a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
@@ -89,6 +89,8 @@ public class SingleMergeExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     SingleMergeExchange g = new SingleMergeExchange(childPOP, PrelUtil.getOrdering(this.collation,
getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index d582bc6..fa5e900 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -73,6 +73,7 @@ public class SortPrel extends SortRel implements Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()),
false);
+    g.setOperatorId(creator.getOperatorId(this));
 
     return g;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..a95d926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -109,7 +109,7 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
 
     Prel child = (Prel) this.getChild();
     StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new
NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
-
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 3c8cfe0..0067926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -61,6 +61,7 @@ public class TopNPrel extends SinglePrel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()),
false, this.limit);
+    topN.setOperatorId(creator.getOperatorId(this));
 
     return topN;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
index 5d6b85d..f14df72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
@@ -81,6 +81,7 @@ public class UnionExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     UnionExchange g = new UnionExchange(childPOP);
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index 4cefeb5..e948125 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -43,7 +43,10 @@ public class WriterPrel extends DrillWriterRelBase implements Prel {
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException
{
     Prel child = (Prel) this.getChild();
-    return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+    PhysicalOperator g = getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+    g.setOperatorId(creator.getOperatorId(this));
+
+    return g;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
index 169deca..2ab6c74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
@@ -42,14 +42,17 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag,
Runt
       if (rel == null) {
         return null;
       }
-      PrelSequencer s = new PrelSequencer();
       final StringWriter sw = new StringWriter();
-      final RelWriter planWriter = new NumberingRelWriter(s.go(rel), new PrintWriter(sw),
explainlevel);
+      final RelWriter planWriter = new NumberingRelWriter(getIdMap(rel), new PrintWriter(sw),
explainlevel);
       rel.explain(planWriter);
       return sw.toString();
 
   }
 
+  public static Map<Prel, OpId> getIdMap(Prel rel){
+    PrelSequencer s = new PrelSequencer();
+    return s.go(rel);
+  }
 
 
   static class Frag implements Iterable<Frag>{
@@ -110,7 +113,7 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag,
Runt
 
   }
 
-  static class OpId{
+  public static class OpId{
     int fragmentId;
     int opId;
     public OpId(int fragmentId, int opId) {
@@ -118,6 +121,21 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag,
Runt
       this.fragmentId = fragmentId;
       this.opId = opId;
     }
+
+
+    public int getFragmentId() {
+      return fragmentId;
+    }
+
+
+    public int getOpId() {
+      return opId;
+    }
+
+    public int getAsSingleInt(){
+      return (fragmentId << 16) + opId;
+    }
+
     @Override
     public int hashCode() {
       final int prime = 31;
@@ -172,19 +190,27 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag,
Runt
     }
 
     // for each fragment, do a dfs of operators to assign operator ids.
-    Map<Prel, OpId> ids = Maps.newHashMap();
+    Map<Prel, OpId> ids = Maps.newIdentityHashMap();
+
+    ids.put(rootFrag.root, new OpId(0, 0));
     for(Frag f : frags){
-      int id = 0;
+      int id = 1;
       Queue<Prel> ops = Lists.newLinkedList();
       ops.add(f.root);
       while(!ops.isEmpty()){
         Prel p = ops.remove();
-        if(p instanceof ExchangePrel && p != f.root) continue;
-        ids.put(p, new OpId(f.majorFragmentId, id++) );
+        boolean isExchange = p instanceof ExchangePrel;
+
+        if(p != f.root){      // we account for exchanges as receviers to guarantee unique
identifiers.
+          ids.put(p, new OpId(f.majorFragmentId, id++) );
+        }
+
 
-        List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator()));
-        for(Prel child : children){
-          ops.add(child);
+        if(!isExchange || p == f.root){
+          List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator()));
+          for(Prel child : children){
+            ops.add(child);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 1cb3cfb..b06432a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -132,8 +132,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
   }
 
   protected PhysicalOperator convertToPop(Prel prel) throws IOException {
-
-    PhysicalPlanCreator creator = new PhysicalPlanCreator(context);
+    PhysicalPlanCreator creator = new PhysicalPlanCreator(context, PrelSequencer.getIdMap(prel));
     PhysicalOperator op = prel.getPhysicalOperator(creator);
     return op;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 3040de2..ad1d6b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -12,6 +12,7 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.store.mock.MockSubScanPOP;
@@ -27,10 +28,12 @@ public class TestOpSerialization {
     DrillConfig c = DrillConfig.create();
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     MockSubScanPOP s = new MockSubScanPOP("abc", null);
-    s.setOperatorId(2);
+    s.setOperatorId(3);
     Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN),
0.1f);
-    f.setOperatorId(1);
-    Screen screen = new Screen(f, CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    f.setOperatorId(2);
+    UnionExchange e = new UnionExchange(f);
+    e.setOperatorId(1);
+    Screen screen = new Screen(e, CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
     screen.setOperatorId(0);
 
     boolean reversed = false;
@@ -38,6 +41,7 @@ public class TestOpSerialization {
 
       List<PhysicalOperator> pops = Lists.newArrayList();
       pops.add(s);
+      pops.add(e);
       pops.add(f);
       pops.add(screen);
 


Mime
View raw message