drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [09/24] git commit: status changes
Date Thu, 22 May 2014 01:14:46 GMT
status changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5472140a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5472140a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5472140a

Branch: refs/heads/diagnostics2
Commit: 5472140ab228376ed6e205e60edec473771b9dba
Parents: 73d3bd0
Author: Jacques Nadeau <jacques@apache.org>
Authored: Mon May 19 07:57:25 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon May 19 09:12:09 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |    5 +-
 exec/java-exec/pom.xml                          |   58 +-
 .../apache/drill/exec/client/DrillClient.java   |    2 +-
 .../drill/exec/client/QuerySubmitter.java       |   10 +-
 .../apache/drill/exec/ops/FragmentStats.java    |   37 +-
 .../org/apache/drill/exec/ops/MetricDef.java    |    6 +
 .../org/apache/drill/exec/ops/OpProfileDef.java |   20 +
 .../apache/drill/exec/ops/OperatorContext.java  |   25 +
 .../apache/drill/exec/ops/OperatorStats.java    |  117 +-
 .../exec/physical/base/AbstractExchange.java    |   12 +-
 .../exec/physical/base/AbstractGroupScan.java   |    4 +
 .../exec/physical/base/PhysicalOperator.java    |    3 +
 .../exec/physical/config/BroadcastSender.java   |    9 +
 .../exec/physical/config/ExternalSort.java      |   19 +-
 .../drill/exec/physical/config/Filter.java      |   13 +-
 .../exec/physical/config/HashAggregate.java     |   30 +-
 .../drill/exec/physical/config/HashJoinPOP.java |    8 +-
 .../physical/config/HashPartitionSender.java    |    9 +-
 .../exec/physical/config/IteratorValidator.java |    7 +-
 .../drill/exec/physical/config/Limit.java       |    5 +
 .../exec/physical/config/MergeJoinPOP.java      |    6 +
 .../physical/config/MergingReceiverPOP.java     |    5 +
 .../physical/config/OrderedPartitionSender.java |    9 +-
 .../drill/exec/physical/config/Project.java     |   12 +-
 .../exec/physical/config/RandomReceiver.java    |   14 +-
 .../drill/exec/physical/config/RangeSender.java |   10 +-
 .../drill/exec/physical/config/Screen.java      |    7 +-
 .../physical/config/SelectionVectorRemover.java |    8 +-
 .../exec/physical/config/SingleSender.java      |   13 +-
 .../apache/drill/exec/physical/config/Sort.java |   20 +-
 .../physical/config/StreamingAggregate.java     |   13 +-
 .../apache/drill/exec/physical/config/TopN.java |   18 +-
 .../drill/exec/physical/config/Trace.java       |    6 +
 .../drill/exec/physical/config/Union.java       |    6 +
 .../exec/physical/impl/WriterRecordBatch.java   |  153 +-
 .../physical/impl/aggregate/HashAggBatch.java   |  126 +-
 .../impl/aggregate/StreamingAggBatch.java       |  141 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   46 +-
 .../exec/physical/impl/join/JoinStatus.java     |   19 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |   81 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   50 +-
 .../OrderedPartitionRecordBatch.java            |  173 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   27 +-
 .../exec/record/AbstractSingleRecordBatch.java  |   68 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   20 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   16 +-
 .../drill/exec/store/dfs/easy/EasySubScan.java  |   17 +-
 .../drill/exec/store/dfs/easy/EasyWriter.java   |    5 +
 .../drill/exec/store/direct/DirectSubScan.java  |    7 +
 .../exec/store/easy/json/JSONFormatPlugin.java  |   14 +
 .../exec/store/easy/text/TextFormatPlugin.java  |   15 +-
 .../drill/exec/store/hive/HiveSubScan.java      |    7 +
 .../exec/store/ischema/InfoSchemaSubScan.java   |   13 +-
 .../drill/exec/store/mock/MockStorePOP.java     |    8 +-
 .../drill/exec/store/mock/MockSubScanPOP.java   |    8 +-
 .../exec/store/parquet/ParquetRowGroupScan.java |    6 +
 .../drill/exec/store/parquet/ParquetWriter.java |    8 +
 .../apache/drill/exec/work/foreman/Foreman.java |    4 +-
 .../drill/exec/work/foreman/FragmentData.java   |   43 +
 .../drill/exec/work/foreman/QueryManager.java   |  106 +-
 .../drill/exec/work/foreman/QueryStatus.java    |   33 +
 .../exec/physical/impl/PerformanceTests.java    |   31 -
 .../physical/impl/TestBroadcastExchange.java    |   19 +-
 .../impl/TestExecutionAbstractions.java         |  225 -
 .../drill/exec/server/DrillClientFactory.java   |   21 +
 .../apache/drill/exec/server/HelloResource.java |   22 +
 .../apache/drill/exec/server/TestJersey.java    |   48 +
 .../drill/exec/server/rest/RootResource.java    |    9 +
 exec/java-exec/src/test/resources/logback.xml   |    2 +-
 .../org/apache/drill/jdbc/DrillResultSet.java   |   26 +-
 .../apache/drill/common/types/TypeProtos.java   |  117 +-
 .../org/apache/drill/exec/proto/BitControl.java |  245 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 6290 +++++++++++++++++-
 .../org/apache/drill/exec/proto/UserProtos.java |  184 +-
 protocol/src/main/protobuf/BitControl.proto     |    1 +
 protocol/src/main/protobuf/User.proto           |    8 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   88 +
 77 files changed, 7963 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 5d0812f..8a826b1 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -113,8 +113,6 @@ public class Types {
     case INTERVALYEAR:
     case INTERVALDAY:
     case LATE:
-    case REPEATMAP:
-      return java.sql.Types.OTHER;
     case SMALLINT:
       return java.sql.Types.SMALLINT;
     case TIME:
@@ -226,7 +224,6 @@ public class Types {
     case LATE:
       return Comparability.UNKNOWN;
     case MAP:
-    case REPEATMAP:
       return Comparability.NONE;
     case BIT:
       return Comparability.EQUAL;
@@ -274,7 +271,7 @@ public class Types {
   public static MajorType withScaleAndPrecision(MinorType type, DataMode mode, int scale, int precision) {
     return MajorType.newBuilder().setMinorType(type).setMode(mode).setScale(scale).setPrecision(precision).build();
   }
-  
+
   public static MajorType required(MinorType type){
     return MajorType.newBuilder().setMode(DataMode.REQUIRED).setMinorType(type).build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index e282c00..d693630 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -72,6 +72,26 @@
       <version>2.7.3</version>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>9.1.5.v20140505</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>9.1.5.v20140505</version>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.containers</groupId>
+      <artifactId>jersey-container-jetty-servlet</artifactId>
+      <version>2.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-json-jackson</artifactId>
+      <version>2.8</version>
+    </dependency>
+    <dependency>
       <groupId>net.hydromatic</groupId>
       <artifactId>optiq-core</artifactId>
     </dependency>
@@ -81,17 +101,11 @@
       <version>2.3.19</version>
     </dependency>
     <dependency>
-      <groupId>com.google.caliper</groupId>
-      <artifactId>caliper</artifactId>
-      <version>1.0-beta-1</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>pentaho</groupId>
       <artifactId>mondrian-data-foodmart-json</artifactId>
       <version>0.3.2</version>
       <scope>test</scope>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
@@ -220,9 +234,9 @@
       <version>6.1.26</version>
     </dependency>
     <dependency>
-        <groupId>joda-time</groupId>
-        <artifactId>joda-time</artifactId>
-        <version>2.3</version>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.3</version>
     </dependency>
   </dependencies>
 
@@ -238,7 +252,18 @@
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-core</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-json</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
+
       </dependencies>
     </profile>
     <profile>
@@ -343,7 +368,7 @@
             <configuration>
               <tasks>
                 <copy todir="${project.build.directory}/codegen">
-                  <fileset dir="src/main/codegen"/>
+                  <fileset dir="src/main/codegen" />
                 </copy>
               </tasks>
             </configuration>
@@ -351,8 +376,8 @@
         </executions>
       </plugin>
       <plugin>
-        <!-- Extract parser grammar template from optiq-core.jar and put it under
-             ${project.build.directory} where all freemarker templates are. -->
+        <!-- Extract parser grammar template from optiq-core.jar and put 
+          it under ${project.build.directory} where all freemarker templates are. -->
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
         <version>2.8</version>
@@ -477,7 +502,8 @@
     </plugins>
     <pluginManagement>
       <plugins>
-        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <!--This plugin's configuration is used to store Eclipse m2e settings 
+          only. It has no influence on the Maven build itself. -->
         <plugin>
           <groupId>org.eclipse.m2e</groupId>
           <artifactId>lifecycle-mapping</artifactId>
@@ -495,10 +521,10 @@
                     </goals>
                   </pluginExecutionFilter>
                   <action>
-                    <execute >
+                    <execute>
                       <runOnIncremental>false</runOnIncremental>
                       <runOnConfiguration>true</runOnConfiguration>
-                   </execute >
+                    </execute>
                   </action>
                 </pluginExecution>
               </pluginExecutions>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index fc650b9..3b87dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -38,9 +38,9 @@ import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
-import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 7967957..99e0c80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
-import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.util.VectorUtil;
@@ -149,19 +149,19 @@ public class QuerySubmitter {
     PrintingResultsListener listener;
 
     String[] queries;
-    UserProtos.QueryType queryType;
+    QueryType queryType;
     type = type.toLowerCase();
     switch(type) {
       case "sql":
-        queryType = UserProtos.QueryType.SQL;
+        queryType = QueryType.SQL;
         queries = plan.trim().split(";");
         break;
       case "logical":
-        queryType = UserProtos.QueryType.LOGICAL;
+        queryType = QueryType.LOGICAL;
         queries = new String[]{ plan };
         break;
       case "physical":
-        queryType = UserProtos.QueryType.PHYSICAL;
+        queryType = QueryType.PHYSICAL;
         queries = new String[]{ plan };
         break;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 068b7fd..205c3f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.util.List;
+
 import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import com.google.hive12.common.collect.Lists;
 
 public class FragmentStats {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
-  
+
   private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentExecutor.class,
       "completionTimes");
   private final static String METRIC_BATCHES_COMPLETED = MetricRegistry
@@ -35,28 +39,47 @@ public class FragmentStats {
       .name(FragmentExecutor.class, "recordsCompleted");
   private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentExecutor.class, "dataProcessed");
 
-  
-  
+
+
   private final MetricRegistry metrics;
-  
+
+  private List<OperatorStats> operators = Lists.newArrayList();
+
   public final SingleThreadNestedCounter batchesCompleted;
   public final SingleThreadNestedCounter recordsCompleted;
   public final SingleThreadNestedCounter dataProcessed;
   public final Timer fragmentTime;
+  private final long startTime;
 
   public FragmentStats(MetricRegistry metrics) {
     this.metrics = metrics;
+    this.startTime = System.currentTimeMillis();
     this.fragmentTime = metrics.timer(METRIC_TIMER_FRAGMENT_TIME);
     this.batchesCompleted = new SingleThreadNestedCounter(metrics, METRIC_BATCHES_COMPLETED);
     this.recordsCompleted = new SingleThreadNestedCounter(metrics, METRIC_RECORDS_COMPLETED);
-    this.dataProcessed = new SingleThreadNestedCounter(metrics, METRIC_DATA_PROCESSED);    
+    this.dataProcessed = new SingleThreadNestedCounter(metrics, METRIC_DATA_PROCESSED);
   }
-  
+
   public void addMetricsToStatus(FragmentStatus.Builder stats) {
     stats.setBatchesCompleted(batchesCompleted.get());
     stats.setDataProcessed(dataProcessed.get());
     stats.setRecordsCompleted(recordsCompleted.get());
+
+    MinorFragmentProfile.Builder prfB = MinorFragmentProfile.newBuilder();
+    prfB.setStartTime(startTime);
+    prfB.setEndTime(System.currentTimeMillis());
+
+    for(OperatorStats o : operators){
+      prfB.addOperatorProfile(o.getProfile());
+    }
+
+    stats.setProfile(prfB);
+  }
+
+  public OperatorStats getOperatorStats(OpProfileDef profileDef){
+    OperatorStats stats = new OperatorStats(profileDef);
+    operators.add(stats);
+    return stats;
   }
-  
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java
new file mode 100644
index 0000000..e08a2b2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/MetricDef.java
@@ -0,0 +1,6 @@
+package org.apache.drill.exec.ops;
+
+public interface MetricDef {
+  public String name();
+  public int metricId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
new file mode 100644
index 0000000..61f6d20
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
@@ -0,0 +1,20 @@
+package org.apache.drill.exec.ops;
+
+public class OpProfileDef {
+
+  public int operatorId;
+  public int operatorType;
+  public int incomingCount;
+
+  public int getOperatorId(){
+    return operatorId;
+  }
+
+  public int getOperatorType(){
+    return operatorType;
+  }
+  public int getIncomingCount(){
+    return incomingCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 3b7b4c1..824ec6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.util.Iterator;
+
 import org.apache.drill.common.util.Hook.Closeable;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -28,10 +30,29 @@ public class OperatorContext implements Closeable {
   private final BufferAllocator allocator;
   private boolean closed = false;
   private PhysicalOperator popConfig;
+  private int operatorId;
+  private OperatorStats stats;
 
   public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
     this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
     this.popConfig = popConfig;
+    this.operatorId = popConfig.getOperatorId();
+
+    OpProfileDef def = new OpProfileDef();
+    def.operatorId = operatorId;
+    def.incomingCount = getChildCount(popConfig);
+    def.operatorType = popConfig.getOperatorType();
+    this.stats = context.getStats().getOperatorStats(def);
+  }
+
+  private static int getChildCount(PhysicalOperator popConfig){
+    Iterator<PhysicalOperator> iter = popConfig.iterator();
+    int i = 0;
+    while(iter.hasNext()){
+      iter.next();
+      i++;
+    }
+    return i;
   }
 
   public BufferAllocator getAllocator() {
@@ -57,4 +78,8 @@ public class OperatorContext implements Closeable {
     }
     closed = true;
   }
+
+  public OperatorStats getStats(){
+    return stats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index ce73a0d..dc463b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -17,10 +17,119 @@
  */
 package org.apache.drill.exec.ops;
 
+import org.apache.commons.collections.Buffer;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+
+import com.carrotsearch.hppc.IntDoubleOpenHashMap;
+import com.carrotsearch.hppc.IntLongOpenHashMap;
+
 public class OperatorStats {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
-  
-  private long batchesCompleted;
-  private long dataCompleted;
-  
+
+  private final int operatorId;
+  private final int operatorType;
+
+  private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap();
+  private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap();
+
+  public long[] recordsReceivedByInput;
+  public long[] batchesReceivedByInput;
+  private long[] schemaCountByInput;
+
+  private long batchesOutput;
+  private long recordsOutput;
+
+  private boolean inProcessing = false;
+  private boolean inSetup = false;
+
+  private long processingNanos;
+  private long setupNanos;
+
+  private long processingMark;
+  private long setupMark;
+
+  private long schemas;
+
+  public OperatorStats(OpProfileDef def){
+    this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount());
+  }
+
+  private OperatorStats(int operatorId, int operatorType, int inputCount) {
+    super();
+    this.operatorId = operatorId;
+    this.operatorType = operatorType;
+    this.recordsReceivedByInput = new long[inputCount];
+    this.batchesReceivedByInput = new long[inputCount];
+  }
+
+  public void startSetup() {
+    assert !inSetup;
+    stopProcessing();
+    inSetup = true;
+    setupMark = System.nanoTime();
+  }
+
+  public void stopSetup() {
+    assert inSetup;
+    startProcessing();
+    setupNanos += System.nanoTime() - setupMark;
+    inSetup = false;
+  }
+
+  public void startProcessing() {
+    assert !inProcessing;
+    processingMark = System.nanoTime();
+    inProcessing = true;
+  }
+
+  public void stopProcessing() {
+    assert inProcessing;
+    processingNanos += System.nanoTime() - processingMark;
+  }
+
+  public void batchReceived(int inputIndex, long records, boolean newSchema) {
+    recordsReceivedByInput[inputIndex] += records;
+    batchesReceivedByInput[inputIndex]++;
+    if(newSchema){
+      schemaCountByInput[inputIndex]++;
+    }
+  }
+
+  public OperatorProfile getProfile() {
+    OperatorProfile.Builder b = OperatorProfile //
+        .newBuilder() //
+        .setOperatorType(operatorType) //
+        .setOperatorId(operatorId) //
+        .setOutputProfile(StreamProfile.newBuilder().setBatches(batchesOutput).setRecords(recordsOutput)) //
+        .setSetupNanos(setupNanos) //
+        .setProcessNanos(processingNanos);
+
+    for(int i = 0; i < recordsReceivedByInput.length; i++){
+      b.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
+    }
+
+    for(int i =0; i < longMetrics.allocated.length; i++){
+      if(longMetrics.allocated[i]){
+        b.addMetricBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]);
+      }
+    }
+
+    for(int i =0; i < doubleMetrics.allocated.length; i++){
+      if(doubleMetrics.allocated[i]){
+        b.addMetricBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]);
+      }
+    }
+
+    return b.build();
+  }
+
+  public void addLongStat(MetricDef metric, long value){
+    longMetrics.putOrAdd(metric.metricId(), value, value);
+  }
+
+  public void addDoubleStat(MetricDef metric, double value){
+    doubleMetrics.putOrAdd(metric.metricId(), value, value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index a09eb14..4a918b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -47,20 +47,20 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
 
   protected abstract void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ;
   protected abstract void setupReceivers(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException ;
-  
+
   @Override
   public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException {
     this.senderMajorFragmentId = majorFragmentId;
     setupSenders(senderLocations);
   }
-  
+
 
   @Override
   public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
     this.receiverMajorFragmentId = majorFragmentId;
     setupReceivers(receiverLocations);
   }
-  
+
   @Override
   public OperatorCost getAggregateSendCost() {
     return getExchangeCost().getSendCost();
@@ -87,4 +87,10 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
     return getExchangeCost().getCombinedCost();
   }
 
+  @Override
+  public int getOperatorType() {
+    throw new UnsupportedOperationException();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 0a5c4fb..4978450 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -82,4 +82,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
     throw new UnsupportedOperationException(String.format("%s does not have exact column value count!", this.getClass().getCanonicalName()));
   }
 
+  @Override
+  public int getOperatorType() {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 483c364..4dd2aef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -104,4 +104,7 @@ public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
 
   @JsonProperty("@id")
   public void setOperatorId(int id);
+
+  @JsonIgnore
+  public int getOperatorType();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
index 9c0388a..88c40a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
@@ -21,10 +21,12 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import java.util.List;
 
@@ -63,4 +65,11 @@ public class BroadcastSender extends AbstractSender {
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitBroadcastSender(this, value);
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.BROADCAST_SENDER_VALUE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index e02bb07..a9e38ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -20,11 +20,13 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import java.util.List;
 
@@ -56,14 +58,14 @@ public class ExternalSort extends Sort {
     long n = childSize.getRecordCount();
     long width = childSize.getRecordSize();
 
-    //TODO: Magic Number, let's assume 1/10 of data can fit in memory. 
+    //TODO: Magic Number, let's assume 1/10 of data can fit in memory.
     int k = 10;
     long n2 = n/k;
-    double cpuCost = 
-        k * n2 * (Math.log(n2)/Math.log(2)) + // 
+    double cpuCost =
+        k * n2 * (Math.log(n2)/Math.log(2)) + //
         n * (Math.log(k)/Math.log(2));
     double diskCost = n*width*2;
-    
+
     return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
   }
 
@@ -72,7 +74,10 @@ public class ExternalSort extends Sort {
     return new ExternalSort(child, orderings, reverse);
   }
 
-    
-  
-  
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.EXTERNAL_SORT_VALUE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
index f125d05..99b5f46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Filter.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,7 +37,7 @@ public class Filter extends AbstractSingle {
 
   private final LogicalExpression expr;
   private final float selectivity;
-  
+
   @JsonCreator
   public Filter(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("selectivity") float selectivity) {
     super(child);
@@ -67,7 +68,7 @@ public class Filter extends AbstractSingle {
   public Size getSize() {
     return new Size( (long) (child.getSize().getRecordCount()*selectivity), child.getSize().getRecordSize());
   }
-   
+
   @Override
   public SelectionVectorMode getSVMode() {
     if (child.getSVMode().equals(SelectionVectorMode.FOUR_BYTE)) {
@@ -76,5 +77,11 @@ public class Filter extends AbstractSingle {
       return SelectionVectorMode.TWO_BYTE;
     }
   }
-  
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.FILTER_VALUE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index eb77d78..d8fdc4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -42,7 +43,7 @@ public class HashAggregate extends AbstractSingle {
 
   // configuration parameters for the hash table
   private final HashTableConfig htConfig;
-  
+
   @JsonCreator
   public HashAggregate(@JsonProperty("child") PhysicalOperator child, @JsonProperty("keys") NamedExpression[] groupByExprs, @JsonProperty("exprs") NamedExpression[] aggrExprs, @JsonProperty("cardinality") float cardinality) {
     super(child);
@@ -50,12 +51,12 @@ public class HashAggregate extends AbstractSingle {
     this.aggrExprs = aggrExprs;
     this.cardinality = cardinality;
 
-    int initial_capacity = cardinality > HashTable.DEFAULT_INITIAL_CAPACITY ? 
+    int initial_capacity = cardinality > HashTable.DEFAULT_INITIAL_CAPACITY ?
       (int) cardinality : HashTable.DEFAULT_INITIAL_CAPACITY;
 
-    this.htConfig = new HashTableConfig(initial_capacity,                                        
-                                        HashTable.DEFAULT_LOAD_FACTOR, 
-                                        groupByExprs, 
+    this.htConfig = new HashTableConfig(initial_capacity,
+                                        HashTable.DEFAULT_LOAD_FACTOR,
+                                        groupByExprs,
                                         null /* no probe exprs */) ;
   }
 
@@ -90,8 +91,8 @@ public class HashAggregate extends AbstractSingle {
     int numExprs = getGroupByExprs().length;
 
     double cpuCost = n * numExprs * hashCpuCost;
-    double diskCost = 0;      // for now assume hash table fits in memory 
-        
+    double diskCost = 0;      // for now assume hash table fits in memory
+
     return new OperatorCost(0, (float) diskCost, (float) n*width, (float) cpuCost);
   }
 
@@ -99,7 +100,7 @@ public class HashAggregate extends AbstractSingle {
 	  logger.debug("HashAggregate cost: cpu = {}, disk = {}, memory = {}, network = {}.", HACost.getCpu(), HACost.getDisk(), HACost.getMemory(), HACost.getNetwork());
 	  logger.debug("Streaming aggregate cost: cpu = {}, disk = {}, memory = {}, network = {}.", SACost.getCpu(), SACost.getDisk(), SACost.getMemory(), SACost.getNetwork());
   }
-  
+
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
@@ -110,10 +111,13 @@ public class HashAggregate extends AbstractSingle {
     // not a great hack...
     return new Size( (long) (child.getSize().getRecordCount()*cardinality), child.getSize().getRecordSize());
   }
-  
-  
 
-  
-  
-  
+
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.HASH_AGGREGATE_VALUE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 4ae27b8..1ef7e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -37,9 +37,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
+
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.eigenbase.rel.JoinRelType;
 
 @JsonTypeName("hash-join")
@@ -119,4 +120,9 @@ public class HashJoinPOP extends AbstractBase {
             return this;
         }
     }
+
+    @Override
+    public int getOperatorType() {
+      return CoreOperatorType.HASH_JOIN_VALUE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
index 702c787..bdb1362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -59,11 +60,15 @@ public class HashPartitionSender extends AbstractSender {
   public LogicalExpression getExpr() {
     return expr;
   }
-  
+
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitHashPartitionSender(this, value);
   }
-  
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.HASH_PARTITION_SENDER_VALUE;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
index 67bba96..94bffea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
@@ -21,13 +21,14 @@ import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 public class IteratorValidator extends AbstractSingle{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class);
 
   public IteratorValidator(PhysicalOperator child) {
     super(child);
-    
+
   }
 
   @Override
@@ -45,4 +46,8 @@ public class IteratorValidator extends AbstractSingle{
     return new IteratorValidator(child);
   }
 
+  @Override
+  public int getOperatorType() {
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
index 7d1d485..0db58ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 @JsonTypeName("limit")
@@ -68,4 +69,8 @@ public class Limit extends AbstractSingle {
     return SelectionVectorMode.TWO_BYTE;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.LIMIT_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 264ee94..be9cf95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.eigenbase.rel.JoinRelType;
 
 import com.beust.jcommander.internal.Lists;
@@ -115,4 +116,9 @@ public class MergeJoinPOP extends AbstractBase{
     }
 
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.MERGE_JOIN_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 549c65c..da5e7e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractReceiver;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -82,4 +83,8 @@ public class MergingReceiverPOP extends AbstractReceiver{
     return orderings;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.MERGING_RECEIVER_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index 55632a2..83076a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.beust.jcommander.internal.Lists;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -41,7 +42,7 @@ public class OrderedPartitionSender extends AbstractSender {
   private final FieldReference ref;
   private final List<DrillbitEndpoint> endpoints;
   private final int sendingWidth;
-  
+
   private int recordsToSample;
   private int samplingFactor;
   private float completionFactor;
@@ -90,7 +91,7 @@ public class OrderedPartitionSender extends AbstractSender {
   public OperatorCost getCost() {
     return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
   }
-  
+
   @Override
   public Size getSize() {
     //TODO: This should really change the row width...
@@ -115,4 +116,8 @@ public class OrderedPartitionSender extends AbstractSender {
     return completionFactor;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
index 0e6b0fd..8a3f5e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -36,7 +37,7 @@ public class Project extends AbstractSingle{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Project.class);
 
   private final List<NamedExpression> exprs;
-  
+
   @JsonCreator
   public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
     super(child);
@@ -56,7 +57,7 @@ public class Project extends AbstractSingle{
   public OperatorCost getCost() {
     return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
   }
-  
+
   @Override
   public Size getSize() {
     //TODO: This should really change the row width...
@@ -67,9 +68,14 @@ public class Project extends AbstractSingle{
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
     return new Project(exprs, child);
   }
-  
+
   @Override
   public SelectionVectorMode getSVMode() {
     return child.getSVMode();
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PROJECT_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
index 676de38..f517f7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -37,14 +38,14 @@ public class RandomReceiver extends AbstractReceiver{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
 
   private List<DrillbitEndpoint> senders;
-  
+
   @JsonCreator
   public RandomReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
                         @JsonProperty("senders") List<DrillbitEndpoint> senders) {
     super(oppositeMajorFragmentId);
     this.senders = senders;
   }
-  
+
   @Override
   @JsonProperty("senders")
   public List<DrillbitEndpoint> getProvidingEndpoints() {
@@ -62,7 +63,7 @@ public class RandomReceiver extends AbstractReceiver{
     return new OperatorCost(1,1,1,1);
   }
 
-  
+
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitRandomReceiver(this, value);
@@ -74,7 +75,8 @@ public class RandomReceiver extends AbstractReceiver{
     return new Size(1,1);
   }
 
-  
-
-  
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.RANDOM_RECEIVER_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
index 08a6c1b..c8c8f43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,7 +34,7 @@ public class RangeSender extends AbstractSender{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class);
 
   List<EndpointPartition> partitions;
-  
+
   @JsonCreator
   public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) {
     super(oppositeMajorFragmentId, child);
@@ -55,7 +56,7 @@ public class RangeSender extends AbstractSender{
   public static class EndpointPartition{
     private final PartitionRange range;
     private final DrillbitEndpoint endpoint;
-    
+
     @JsonCreator
     public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) {
       super();
@@ -69,4 +70,9 @@ public class RangeSender extends AbstractSender{
       return endpoint;
     }
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.RANGE_SENDER_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index c4d78f9..4f5cb47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.physical.base.Root;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.Store;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -103,5 +104,9 @@ public class Screen extends AbstractStore {
     return physicalVisitor.visitScreen(this, value);
   }
 
-  
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.SCREEN_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java
index 5e891ec..13cbbe2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SelectionVectorRemover.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -32,7 +33,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class SelectionVectorRemover extends AbstractSingle {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorRemover.class);
-  
+
   @JsonCreator
   public SelectionVectorRemover(@JsonProperty("child") PhysicalOperator child) {
     super(child);
@@ -62,4 +63,9 @@ public class SelectionVectorRemover extends AbstractSingle {
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.NONE;
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.SELECTION_VECTOR_REMOVER_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
index 9894164..82962ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractSender;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -40,7 +41,7 @@ public class SingleSender extends AbstractSender {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSender.class);
 
   private final DrillbitEndpoint destination;
-  
+
   @JsonCreator
   public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) {
     super(oppositeMajorFragmentId, child);
@@ -68,11 +69,15 @@ public class SingleSender extends AbstractSender {
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitSingleSender(this, value);
   }
- 
+
 
   public DrillbitEndpoint getDestination() {
     return destination;
   }
- 
-  
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.SINGLE_SENDER_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index c9ac137..d7415d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,10 +35,10 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("sort")
 public class Sort extends AbstractSingle{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
-  
+
   protected final List<Ordering> orderings;
   protected boolean reverse = false;
-  
+
   @JsonCreator
   public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
     super(child);
@@ -64,14 +65,14 @@ public class Sort extends AbstractSingle{
     long n = childSize.getRecordCount();
     long width = childSize.getRecordSize();
 
-    //TODO: Magic Number, let's assume 1/10 of data can fit in memory. 
+    //TODO: Magic Number, let's assume 1/10 of data can fit in memory.
     int k = 10;
     long n2 = n/k;
-    double cpuCost = 
-        k * n2 * (Math.log(n2)/Math.log(2)) + // 
+    double cpuCost =
+        k * n2 * (Math.log(n2)/Math.log(2)) + //
         n * (Math.log(k)/Math.log(2));
     double diskCost = n*width*2;
-    
+
     return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
   }
 
@@ -84,6 +85,9 @@ public class Sort extends AbstractSingle{
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.FOUR_BYTE;
   }
-  
-  
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.OLD_SORT_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
index 2dcdce1..6e06c24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -37,7 +38,7 @@ public class StreamingAggregate extends AbstractSingle {
   private final NamedExpression[] exprs;
 
   private final float cardinality;
-  
+
   @JsonCreator
   public StreamingAggregate(@JsonProperty("child") PhysicalOperator child, @JsonProperty("keys") NamedExpression[] keys, @JsonProperty("exprs") NamedExpression[] exprs, @JsonProperty("cardinality") float cardinality) {
     super(child);
@@ -74,10 +75,10 @@ public class StreamingAggregate extends AbstractSingle {
     // not a great hack...
     return new Size( (long) (child.getSize().getRecordCount()*cardinality), child.getSize().getRecordSize());
   }
-  
-  
 
-  
-  
-  
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.STREAMING_AGGREGATE_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
index 79c5782..dfd142a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java
@@ -20,12 +20,14 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import java.util.List;
 
@@ -64,14 +66,14 @@ public class TopN extends Sort {
     long n = childSize.getRecordCount();
     long width = childSize.getRecordSize();
 
-    //TODO: Magic Number, let's assume 1/10 of data can fit in memory. 
+    //TODO: Magic Number, let's assume 1/10 of data can fit in memory.
     int k = 10;
     long n2 = n/k;
-    double cpuCost = 
-        k * n2 * (Math.log(n2)/Math.log(2)) + // 
+    double cpuCost =
+        k * n2 * (Math.log(n2)/Math.log(2)) + //
         n * (Math.log(k)/Math.log(2));
     double diskCost = n*width*2;
-    
+
     return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
   }
 
@@ -80,7 +82,9 @@ public class TopN extends Sort {
     return new TopN(child, orderings, reverse, limit);
   }
 
-    
-  
-  
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.TOP_N_SORT_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
index a81d3e9..f4ba842 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -67,4 +68,9 @@ public class Trace extends AbstractSingle {
     protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
         return new Trace(child, traceTag);
     }
+
+    @Override
+    public int getOperatorType() {
+      return CoreOperatorType.TRACE_VALUE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
index fcefc37..522100f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import java.util.List;
 
@@ -56,4 +58,8 @@ public class Union extends AbstractMultiple {
     return cost;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.UNION_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 81a4d58..1113af4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -69,88 +69,97 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
 
   @Override
   public IterOutcome next() {
-    if(processed) {
-      // if the upstream record batch is already processed and next() is called by
-      // downstream then return NONE to indicate completion
-      return IterOutcome.NONE;
-    }
+    stats.startProcessing();
+    try{
 
-    // process the complete upstream in one next() call
-    IterOutcome upstream;
-    do {
-      upstream = incoming.next();
-      if(first && upstream == IterOutcome.OK)
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-      first = false;
-
-      switch(upstream) {
-        case NOT_YET:
-        case NONE:
-        case STOP:
-          cleanup();
-          if (upstream == IterOutcome.STOP)
-            return upstream;
-          break;
-
-        case OK_NEW_SCHEMA:
-          try{
-            setupNewSchema();
-          }catch(Exception ex){
-            kill();
-            logger.error("Failure during query", ex);
-            context.fail(ex);
-            return IterOutcome.STOP;
-          }
-          // fall through.
-        case OK:
-          try {
-            counter += eventBasedRecordWriter.write();
-            logger.debug("Total records written so far: {}", counter);
-          } catch(IOException ex) {
-            throw new RuntimeException(ex);
-          }
-
-          for(VectorWrapper v : incoming)
-            v.getValueVector().clear();
-
-          break;
-
-        default:
-          throw new UnsupportedOperationException();
+      if(processed) {
+        // if the upstream record batch is already processed and next() is called by
+        // downstream then return NONE to indicate completion
+        return IterOutcome.NONE;
       }
-    } while(upstream != IterOutcome.NONE);
-
-    // Create two vectors for:
-    //   1. Fragment unique id.
-    //   2. Summary: currently contains number of records written.
-    MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR));
-    MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
-
-    VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
-    AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-    BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
-    AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
-
 
-    container.add(fragmentIdVector);
-    container.add(summaryVector);
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-    fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
-    fragmentIdVector.getMutator().setValueCount(1);
-    summaryVector.getMutator().setSafe(0, counter);
-    summaryVector.getMutator().setValueCount(1);
-
-    container.setRecordCount(1);
-    processed = true;
+      // process the complete upstream in one next() call
+      IterOutcome upstream;
+      do {
+        upstream = next(incoming);
+        if(first && upstream == IterOutcome.OK)
+          upstream = IterOutcome.OK_NEW_SCHEMA;
+        first = false;
+
+        switch(upstream) {
+          case NOT_YET:
+          case NONE:
+          case STOP:
+            cleanup();
+            if (upstream == IterOutcome.STOP)
+              return upstream;
+            break;
+
+          case OK_NEW_SCHEMA:
+            try{
+              setupNewSchema();
+            }catch(Exception ex){
+              kill();
+              logger.error("Failure during query", ex);
+              context.fail(ex);
+              return IterOutcome.STOP;
+            }
+            // fall through.
+          case OK:
+            try {
+              counter += eventBasedRecordWriter.write();
+              logger.debug("Total records written so far: {}", counter);
+            } catch(IOException ex) {
+              throw new RuntimeException(ex);
+            }
+
+            for(VectorWrapper v : incoming)
+              v.getValueVector().clear();
+
+            break;
+
+          default:
+            throw new UnsupportedOperationException();
+        }
+      } while(upstream != IterOutcome.NONE);
+
+      // Create two vectors for:
+      //   1. Fragment unique id.
+      //   2. Summary: currently contains number of records written.
+      MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR));
+      MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT));
+
+      VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator());
+      AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+      BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator());
+      AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR)));
+
+
+      container.add(fragmentIdVector);
+      container.add(summaryVector);
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+      fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes());
+      fragmentIdVector.getMutator().setValueCount(1);
+      summaryVector.getMutator().setSafe(0, counter);
+      summaryVector.getMutator().setValueCount(1);
+
+      container.setRecordCount(1);
+      processed = true;
+
+      return IterOutcome.OK_NEW_SCHEMA;
+    }finally{
+      stats.stopProcessing();
+    }
 
-    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   protected void setupNewSchema() throws Exception {
     try {
       // update the schema in RecordWriter
+      stats.startSetup();
       recordWriter.updateSchema(incoming.getSchema());
+      stats.stopSetup();
     } catch(IOException ex) {
       throw new RuntimeException("Failed to update schema in RecordWriter", ex);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index aa6cd54..1a22f3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -76,14 +76,14 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   private TypedFieldId[] groupByOutFieldIds ;
   private TypedFieldId[] aggrOutFieldIds ;      // field ids for the outgoing batch
 
-  private final GeneratorMapping UPDATE_AGGR_INSIDE = 
-    GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */, 
+  private final GeneratorMapping UPDATE_AGGR_INSIDE =
+    GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
                             "resetValues" /* reset */, "cleanup" /* cleanup */) ;
 
-  private final GeneratorMapping UPDATE_AGGR_OUTSIDE = 
-    GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordValues" /* eval method */, 
+  private final GeneratorMapping UPDATE_AGGR_OUTSIDE =
+    GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordValues" /* eval method */,
                             "resetValues" /* reset */, "cleanup" /* cleanup */) ;
-   
+
   private final MappingSet UpdateAggrValuesMapping = new MappingSet("incomingRowIdx" /* read index */, "outRowIdx" /* write index */, "htRowIdx" /* workspace index */, "incoming" /* read container */, "outgoing" /* write container */, "aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE);
 
 
@@ -100,72 +100,78 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   public IterOutcome next() {
-
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null) {
-      IterOutcome outcome = incoming.next();
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()){
-          done = true;
-          return IterOutcome.STOP;
+    stats.startProcessing();
+    try{
+      // this is only called on the first batch. Beyond this, the aggregator manages batches.
+      if (aggregator == null) {
+        IterOutcome outcome = next(incoming);
+        logger.debug("Next outcome of {}", outcome);
+        switch (outcome) {
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          return outcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()){
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case OK:
+          throw new IllegalStateException("You should never get a first batch without a new schema");
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", outcome));
         }
-        break;
-      case OK:
-        throw new IllegalStateException("You should never get a first batch without a new schema");
-      default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
       }
-    }
 
 
-    if (aggregator.allFlushed()) {
-      return IterOutcome.NONE;
-    }
+      if (aggregator.allFlushed()) {
+        return IterOutcome.NONE;
+      }
+
+      logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
 
-    logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
-    
-    while(true){
-      AggOutcome out = aggregator.doWork();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
-      case CLEANUP_AND_RETURN:
-        container.clear();
-        aggregator.cleanup();
-        done = true;
-        return aggregator.getOutcome();
-      case RETURN_OUTCOME:
-        return aggregator.getOutcome();
-      case UPDATE_AGGREGATOR:
-        aggregator = null;
-        if(!createAggregator()){
-          return IterOutcome.STOP;
+      while(true){
+        AggOutcome out = aggregator.doWork();
+        logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+        switch(out){
+        case CLEANUP_AND_RETURN:
+          container.clear();
+          aggregator.cleanup();
+          done = true;
+          return aggregator.getOutcome();
+        case RETURN_OUTCOME:
+          return aggregator.getOutcome();
+        case UPDATE_AGGREGATOR:
+          aggregator = null;
+          if(!createAggregator()){
+            return IterOutcome.STOP;
+          }
+          continue;
+        default:
+          throw new IllegalStateException(String.format("Unknown state %s.", out));
         }
-        continue;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
       }
+      }finally{
+      stats.stopProcessing();
     }
-    
   }
 
   /**
    * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
    * and informing the context of the failure state, as well is informing the upstream operators.
-   * 
+   *
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
     try{
+      stats.startSetup();
       this.aggregator = createAggregatorInternal();
+      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+      stats.stopSetup();
       context.fail(ex);
       container.clear();
       incoming.kill();
@@ -181,12 +187,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     container.clear();
     List<VectorAllocator> keyAllocators = Lists.newArrayList();
     List<VectorAllocator> valueAllocators = Lists.newArrayList();
-    
+
     int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().length : 0;
     int numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().length : 0;
     aggrExprs = new LogicalExpression[numAggrExprs];
     groupByOutFieldIds = new TypedFieldId[numGroupByExprs];
-    aggrOutFieldIds = new TypedFieldId[numAggrExprs];    
+    aggrOutFieldIds = new TypedFieldId[numAggrExprs];
 
     ErrorCollector collector = new ErrorCollectorImpl();
 
@@ -201,18 +207,18 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       keyAllocators.add(VectorAllocator.getAllocator(vv, 50));
 
-      // add this group-by vector to the output container 
+      // add this group-by vector to the output container
       groupByOutFieldIds[i] = container.add(vv);
     }
 
     for(i = 0; i < numAggrExprs; i++){
       NamedExpression ne = popConfig.getAggrExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
-  
+
       if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
 
       if(expr == null) continue;
-      
+
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       valueAllocators.add(VectorAllocator.getAllocator(vv, 50));
@@ -229,10 +235,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     HashAggregator agg = context.getImplementationClass(top);
 
     agg.setup(popConfig, context, oContext.getAllocator(), incoming, this,
-              aggrExprs, 
+              aggrExprs,
               cgInner.getWorkspaceTypes(),
               groupByOutFieldIds,
-              keyAllocators.toArray(new VectorAllocator[keyAllocators.size()]), 
+              keyAllocators.toArray(new VectorAllocator[keyAllocators.size()]),
               valueAllocators.toArray(new VectorAllocator[valueAllocators.size()]));
 
     return agg;
@@ -268,12 +274,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       cg.getBlock("getVectorIndex")._return(var.invoke("getIndex").arg(JExpr.direct("recordIndex")));;
       return;
     }
-     
+
     default:
       throw new IllegalStateException();
-      
+
     }
-   
+
   }
 
   @Override


Mime
View raw message