drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [16/24] git commit: diag wip
Date Thu, 22 May 2014 01:14:53 GMT
diag wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cb90852a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cb90852a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cb90852a

Branch: refs/heads/diagnostics2
Commit: cb90852a1c202ab5bbf64dd17b6ba367d6e7561c
Parents: e6121d0
Author: Jacques Nadeau <jacques@apache.org>
Authored: Mon May 19 16:39:36 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon May 19 16:39:36 2014 -0700

----------------------------------------------------------------------
 .../exec/cache/ProtobufDrillSerializable.java   |   73 +
 .../exec/cache/SerializationDefinition.java     |    4 +-
 .../drill/exec/cache/infinispan/ICache.java     |   19 +-
 .../infinispan/JacksonAdvancedExternalizer.java |   70 +
 .../ProtobufAdvancedExternalizer.java           |   67 +
 .../apache/drill/exec/ops/FragmentStats.java    |   36 +-
 .../apache/drill/exec/ops/OperatorContext.java  |    6 +-
 .../apache/drill/exec/ops/OperatorStats.java    |    8 +-
 .../drill/exec/physical/impl/ScreenCreator.java |   38 +-
 .../drill/exec/physical/impl/TraceInjector.java |    1 +
 .../partitionsender/OutgoingRecordBatch.java    |   12 +-
 .../PartitionSenderRootExec.java                |   13 +-
 .../partitionsender/PartitionSenderStats.java   |   15 +
 .../validate/IteratorValidatorInjector.java     |   15 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |   19 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |   10 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |    4 +-
 .../drill/exec/server/rest/DrillRestServer.java |    1 +
 .../drill/exec/server/rest/DrillRoot.java       |   53 +-
 .../drill/exec/util/JsonStringArrayList.java    |    2 +-
 .../drill/exec/util/JsonStringHashMap.java      |    2 +-
 .../org/apache/drill/exec/util/Pointer.java     |   11 +
 .../org/apache/drill/exec/work/WorkManager.java |   52 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   22 +-
 .../drill/exec/work/foreman/FragmentData.java   |   17 +-
 .../drill/exec/work/foreman/QueryManager.java   |   33 +-
 .../drill/exec/work/foreman/QueryStatus.java    |  130 +-
 .../work/fragment/AbstractStatusReporter.java   |   37 +-
 .../exec/work/fragment/FragmentExecutor.java    |   39 +-
 .../work/fragment/NonRootFragmentManager.java   |   13 +-
 .../exec/work/fragment/StatusReporter.java      |    2 +-
 .../src/main/resources/rest/status/list.ftl     |    9 +
 .../src/main/resources/rest/status/profile.ftl  |   19 +
 .../org/apache/drill/exec/proto/BitControl.java | 1263 ++-----------
 .../apache/drill/exec/proto/UserBitShared.java  | 1682 +++++++++++++-----
 protocol/src/main/protobuf/BitControl.proto     |   23 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   25 +-
 37 files changed, 2048 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
new file mode 100644
index 0000000..9d6b645
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+public abstract class ProtobufDrillSerializable<T extends Message> extends LoopedAbstractDrillSerializable implements DrillSerializable{
+  private Parser<T> parser;
+  private T obj;
+
+  public ProtobufDrillSerializable(T obj){
+    this.parser = (Parser<T>) obj.getParserForType();
+    this.obj = obj;
+  }
+
+  public ProtobufDrillSerializable(Parser<T> parser) {
+    this.parser = parser;
+  }
+
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    obj = parser.parseDelimitedFrom(input);
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    obj.writeDelimitedTo(output);
+  }
+
+  public T getObj() {
+    return obj;
+  }
+
+  public static class CQueryProfile extends ProtobufDrillSerializable<QueryProfile>{
+
+    public CQueryProfile(BufferAllocator allocator){
+      super(QueryProfile.PARSER);
+    }
+
+    public CQueryProfile() {
+      super(QueryProfile.PARSER);
+
+    }
+
+    public CQueryProfile(QueryProfile obj) {
+      super(obj);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
index 95ba434..711ddf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.cache;
 
 import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.server.options.OptionValue;
 
 public enum SerializationDefinition {
 
   OPTION(3002, OptionValue.class),
-  STORAGE_PLUGINS(3003, StoragePlugins.class)
+  STORAGE_PLUGINS(3003, StoragePlugins.class),
+  FRAGMENT_STATUS(3004, FragmentStatus.class)
   ;
   public final int id;
   public final Class<?> clazz;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index 5533dd7..92ce08d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -32,10 +32,12 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.DistributedMap;
 import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.cache.SerializationDefinition;
 import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
 import org.apache.drill.exec.cache.ProtoSerializable.PlanFragmentSerializable;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.infinispan.Cache;
@@ -65,13 +67,18 @@ public class ICache implements DistributedCache{
 
   public ICache(DrillConfig config, BufferAllocator allocator) throws Exception {
     String clusterName = config.getString(ExecConstants.SERVICE_NAME);
-    GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
-    gcb.transport() //
-    .defaultTransport().clusterName(clusterName).build();
-    gcb.serialization() //
-    .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator));
+    GlobalConfiguration gc = new GlobalConfigurationBuilder() //
+      .transport() //
+        .defaultTransport() //
+        .clusterName(clusterName) //;
+        //
+      .serialization() //
+        .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator)) //
+        .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.OPTION, config.getMapper())) //
+        .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.STORAGE_PLUGINS, config.getMapper())) //
+        .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_STATUS, FragmentStatus.PARSER)) //
+      .build();
 
-    GlobalConfiguration gc = gcb.build();
     Configuration c = new ConfigurationBuilder() //
       .clustering() //
       .cacheMode(CacheMode.DIST_ASYNC) //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
new file mode 100644
index 0000000..81f4877
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.exec.cache.SerializationDefinition;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T>  {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedExternalizer.class);
+
+  private final Class<?> clazz;
+  private final ObjectMapper mapper;
+  private final int id;
+
+  public JacksonAdvancedExternalizer(SerializationDefinition def, ObjectMapper mapper){
+    this.clazz =  def.clazz;
+    this.mapper = mapper;
+    this.id = def.id;
+  }
+
+  @Override
+  public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+    return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
+  }
+
+  @Override
+  public void writeObject(ObjectOutput out, T object) throws IOException {
+    out.write(mapper.writeValueAsBytes(object));
+  }
+
+  @Override
+  public Integer getId() {
+    return id;
+  }
+
+  @Override
+  public Set<Class<? extends T>> getTypeClasses() {
+    return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz);
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
new file mode 100644
index 0000000..821443a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.exec.cache.SerializationDefinition;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+public class ProtobufAdvancedExternalizer<T extends Message> implements AdvancedExternalizer<T>  {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufAdvancedExternalizer.class);
+
+  private final Class<?> clazz;
+  private final int id;
+  private final Parser<T> parser;
+
+  public ProtobufAdvancedExternalizer(SerializationDefinition def, Parser<T> parser){
+    this.clazz =  def.clazz;
+    this.parser = parser;
+    this.id = def.id;
+  }
+
+  @Override
+  public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+    return parser.parseDelimitedFrom(DataInputInputStream.constructInputStream(in));
+  }
+
+  @Override
+  public void writeObject(ObjectOutput out, T object) throws IOException {
+    out.write(object.toByteArray());
+  }
+
+  @Override
+  public Integer getId() {
+    return id;
+  }
+
+  @Override
+  public Set<Class<? extends T>> getTypeClasses() {
+    return (Set<Class<? extends T>>) (Object) Collections.singleton(clazz);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 205c3f1..24a02b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -19,66 +19,38 @@ package org.apache.drill.exec.ops;
 
 import java.util.List;
 
-import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
 
 import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
 import com.google.hive12.common.collect.Lists;
 
 public class FragmentStats {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
 
-  private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentExecutor.class,
-      "completionTimes");
-  private final static String METRIC_BATCHES_COMPLETED = MetricRegistry
-      .name(FragmentExecutor.class, "batchesCompleted");
-  private final static String METRIC_RECORDS_COMPLETED = MetricRegistry
-      .name(FragmentExecutor.class, "recordsCompleted");
-  private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentExecutor.class, "dataProcessed");
-
-
-
-  private final MetricRegistry metrics;
 
   private List<OperatorStats> operators = Lists.newArrayList();
-
-  public final SingleThreadNestedCounter batchesCompleted;
-  public final SingleThreadNestedCounter recordsCompleted;
-  public final SingleThreadNestedCounter dataProcessed;
-  public final Timer fragmentTime;
   private final long startTime;
 
   public FragmentStats(MetricRegistry metrics) {
-    this.metrics = metrics;
     this.startTime = System.currentTimeMillis();
-    this.fragmentTime = metrics.timer(METRIC_TIMER_FRAGMENT_TIME);
-    this.batchesCompleted = new SingleThreadNestedCounter(metrics, METRIC_BATCHES_COMPLETED);
-    this.recordsCompleted = new SingleThreadNestedCounter(metrics, METRIC_RECORDS_COMPLETED);
-    this.dataProcessed = new SingleThreadNestedCounter(metrics, METRIC_DATA_PROCESSED);
   }
 
-  public void addMetricsToStatus(FragmentStatus.Builder stats) {
-    stats.setBatchesCompleted(batchesCompleted.get());
-    stats.setDataProcessed(dataProcessed.get());
-    stats.setRecordsCompleted(recordsCompleted.get());
+  public void addMetricsToStatus(MinorFragmentProfile.Builder prfB) {
 
-    MinorFragmentProfile.Builder prfB = MinorFragmentProfile.newBuilder();
     prfB.setStartTime(startTime);
     prfB.setEndTime(System.currentTimeMillis());
 
     for(OperatorStats o : operators){
       prfB.addOperatorProfile(o.getProfile());
     }
-
-    stats.setProfile(prfB);
   }
 
   public OperatorStats getOperatorStats(OpProfileDef profileDef){
     OperatorStats stats = new OperatorStats(profileDef);
-    operators.add(stats);
+    if(profileDef.operatorType != -1){
+      operators.add(stats);
+    }
     return stats;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 824ec6a..116b616 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -30,16 +30,14 @@ public class OperatorContext implements Closeable {
   private final BufferAllocator allocator;
   private boolean closed = false;
   private PhysicalOperator popConfig;
-  private int operatorId;
   private OperatorStats stats;
 
   public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
     this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
     this.popConfig = popConfig;
-    this.operatorId = popConfig.getOperatorId();
 
     OpProfileDef def = new OpProfileDef();
-    def.operatorId = operatorId;
+    def.operatorId = popConfig.getOperatorId();
     def.incomingCount = getChildCount(popConfig);
     def.operatorType = popConfig.getOperatorType();
     this.stats = context.getStats().getOperatorStats(def);
@@ -52,6 +50,8 @@ public class OperatorContext implements Closeable {
       iter.next();
       i++;
     }
+
+    if(i == 0) i = 1;
     return i;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index bd00560..cde1876 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.ops;
 
 import org.apache.commons.collections.Buffer;
+import org.apache.drill.exec.proto.UserBitShared.MetricValue;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
 
@@ -37,8 +38,6 @@ public class OperatorStats {
   public long[] batchesReceivedByInput;
   private long[] schemaCountByInput;
 
-  private long batchesOutput;
-  private long recordsOutput;
 
   private boolean inProcessing = false;
   private boolean inSetup = false;
@@ -103,7 +102,6 @@ public class OperatorStats {
         .newBuilder() //
         .setOperatorType(operatorType) //
         .setOperatorId(operatorId) //
-        .setOutputProfile(StreamProfile.newBuilder().setBatches(batchesOutput).setRecords(recordsOutput)) //
         .setSetupNanos(setupNanos) //
         .setProcessNanos(processingNanos);
 
@@ -113,13 +111,13 @@ public class OperatorStats {
 
     for(int i =0; i < longMetrics.allocated.length; i++){
       if(longMetrics.allocated[i]){
-        b.addMetricBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]);
+        b.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]));
       }
     }
 
     for(int i =0; i < doubleMetrics.allocated.length; i++){
       if(doubleMetrics.allocated[i]){
-        b.addMetricBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]);
+        b.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index a0ff28a..c92633f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -40,28 +40,28 @@ import com.google.common.base.Preconditions;
 
 public class ScreenCreator implements RootCreator<Screen>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
-  
-  
-  
+
+
+
   @Override
   public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
     Preconditions.checkNotNull(children);
     Preconditions.checkArgument(children.size() == 1);
     return new ScreenRoot(context, children.iterator().next());
   }
-  
-  
+
+
   static class ScreenRoot implements RootExec{
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     volatile boolean ok = true;
-    
+
     private final SendingAccountor sendCount = new SendingAccountor();
-    
+
     final RecordBatch incoming;
     final FragmentContext context;
     final UserClientConnection connection;
     private RecordMaterializer materializer;
-    
+
     public ScreenRoot(FragmentContext context, RecordBatch incoming){
       assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
 
@@ -69,7 +69,7 @@ public class ScreenCreator implements RootCreator<Screen>{
       this.incoming = incoming;
       this.connection = context.getConnection();
     }
-    
+
     @Override
     public boolean next() {
       if(!ok){
@@ -97,7 +97,7 @@ public class ScreenCreator implements RootCreator<Screen>{
       }
       case NONE: {
         sendCount.waitForSendComplete();
-        context.getStats().batchesCompleted.inc(1);
+//        context.getStats().batchesCompleted.inc(1);
         QueryResult header = QueryResult.newBuilder() //
             .setQueryId(context.getHandle().getQueryId()) //
             .setRowCount(0) //
@@ -107,19 +107,19 @@ public class ScreenCreator implements RootCreator<Screen>{
         QueryWritableBatch batch = new QueryWritableBatch(header);
         connection.sendResult(listener, batch);
         sendCount.increment();
-        
+
         return false;
       }
       case OK_NEW_SCHEMA:
         materializer = new VectorRecordMaterializer(context, incoming);
         // fall through.
       case OK:
-        context.getStats().batchesCompleted.inc(1);
-        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
+//        context.getStats().batchesCompleted.inc(1);
+//        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
         QueryWritableBatch batch = materializer.convertNext(false);
         connection.sendResult(listener, batch);
         sendCount.increment();
-        
+
         return true;
       default:
         throw new UnsupportedOperationException();
@@ -133,7 +133,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
     private SendListener listener = new SendListener();
-    
+
     private class SendListener extends BaseRpcOutcomeListener<Ack>{
 
 
@@ -151,15 +151,15 @@ public class ScreenCreator implements RootCreator<Screen>{
         ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
         ok = false;
       }
-      
+
     }
 
     RecordBatch getIncoming() {
       return incoming;
     }
-    
-    
+
+
   }
-  
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 9c859a8..dddb53f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -84,6 +84,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
         /* Inject trace operator */
         if (list.size() > 0)
             newOp = op.getNewWithChildren(list);
+            newOp.setOperatorId(op.getOperatorId());
 
         return newOp;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index deef25f..646f1a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.ExecProtos;
@@ -43,7 +44,6 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.data.DataTunnel;
-import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.work.ErrorHelper;
@@ -74,8 +74,9 @@ public class OutgoingRecordBatch implements VectorAccessible {
   private int recordCapacity;
   private static int DEFAULT_ALLOC_SIZE = 20000;
   private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
+  private OperatorStats stats;
 
-  public OutgoingRecordBatch(SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming,
+  public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming,
                              FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
     this.incoming = incoming;
     this.context = context;
@@ -83,13 +84,18 @@ public class OutgoingRecordBatch implements VectorAccessible {
     this.operator = operator;
     this.tunnel = tunnel;
     this.sendCount = sendCount;
+    this.stats = stats;
     this.oppositeMinorFragmentId = oppositeMinorFragmentId;
   }
 
   public void flushIfNecessary() {
     if (recordCount == recordCapacity) logger.debug("Flush is necesary:  Count is " + recordCount + ", capacity is " + recordCapacity);
     try {
-      if (recordCount == recordCapacity) flush();
+      if (recordCount == recordCapacity){
+        flush();
+        stats.addLongStat(PartitionSenderStats.BATCHES_SENT, 1l);
+        stats.addLongStat(PartitionSenderStats.RECORDS_SENT, recordCount);
+      }
     } catch (SchemaChangeException e) {
       incoming.kill();
       logger.error("Error flushing outgoing batches", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bcd484c..d999019 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -29,17 +29,16 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -63,8 +62,8 @@ public class PartitionSenderRootExec implements RootExec {
   private FragmentContext context;
   private OperatorContext oContext;
   private boolean ok = true;
-  private AtomicLong batchesSent = new AtomicLong(0);
   private final SendingAccountor sendCount = new SendingAccountor();
+  private final OperatorStats stats;
 
 
   public PartitionSenderRootExec(FragmentContext context,
@@ -75,11 +74,12 @@ public class PartitionSenderRootExec implements RootExec {
     this.operator = operator;
     this.context = context;
     this.oContext = new OperatorContext(operator, context);
+    this.stats = oContext.getStats();
     this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
     int fieldId = 0;
     for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(operator.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
-      outgoing[fieldId] = new OutgoingRecordBatch(sendCount, operator,
+      outgoing[fieldId] = new OutgoingRecordBatch(stats, sendCount, operator,
                                                     context.getDataTunnel(endpoint, opposite),
                                                     incoming,
                                                     context,
@@ -91,6 +91,7 @@ public class PartitionSenderRootExec implements RootExec {
 
   @Override
   public boolean next() {
+    boolean newSchema = false;
 
     if (!ok) {
       stop();
@@ -122,6 +123,7 @@ public class PartitionSenderRootExec implements RootExec {
         return false;
 
       case OK_NEW_SCHEMA:
+        newSchema = true;
         try {
           // send all existing batches
           if (partitioner != null) {
@@ -139,12 +141,11 @@ public class PartitionSenderRootExec implements RootExec {
           return false;
         }
       case OK:
+        stats.batchReceived(0, incoming.getRecordCount(), newSchema);
         partitioner.partitionBatch(incoming);
         for (VectorWrapper v : incoming) {
           v.clear();
         }
-        context.getStats().batchesCompleted.inc(1);
-        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
         return true;
       case NOT_YET:
       default:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
new file mode 100644
index 0000000..99b9120
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
@@ -0,0 +1,15 @@
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import org.apache.drill.exec.ops.MetricDef;
+
+public enum PartitionSenderStats implements MetricDef {
+
+  BATCHES_SENT,
+  RECORDS_SENT;
+
+  @Override
+  public int metricId() {
+    return ordinal();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index aff71bf..428f335 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -35,16 +35,16 @@ public class IteratorValidatorInjector extends
   public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     IteratorValidatorInjector inject = new IteratorValidatorInjector();
     PhysicalOperator newOp = root.accept(inject, context);
-    
+
     if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen.");
 
     return (FragmentRoot) newOp;
-    
+
   }
 
   /**
    * Traverse the physical plan and inject the IteratorValidator operator after every operator.
-   * 
+   *
    * @param op
    *          Physical operator under which the IteratorValidator operator will be injected
    * @param context
@@ -61,12 +61,17 @@ public class IteratorValidatorInjector extends
 
     /* Get the list of child operators */
     for (PhysicalOperator child : op) {
-      newChildren.add(new IteratorValidator(child.accept(this, context)));
+      PhysicalOperator validator = new IteratorValidator(child.accept(this, context));
+      validator.setOperatorId(op.getOperatorId() + 1000);
+      newChildren.add(validator);
     }
 
     /* Inject trace operator */
-    if (newChildren.size() > 0)
+    if (newChildren.size() > 0){
       newOp = op.getNewWithChildren(newChildren);
+      newOp.setOperatorId(op.getOperatorId());
+    }
+
 
     return newOp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 7477440..df66dcf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -32,7 +32,6 @@ import net.hydromatic.optiq.tools.ValidationException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
-import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.logical.DrillRuleSets;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
@@ -40,23 +39,15 @@ import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
 import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
-import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
-import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable;
-import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
-import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
-import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
-import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.parser.impl.DrillParserImpl;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.util.Pointer;
 import org.eigenbase.rel.RelCollationTraitDef;
-import org.eigenbase.rel.RelNode;
-import org.eigenbase.rel.metadata.RelMetadataQuery;
 import org.eigenbase.relopt.ConventionTraitDef;
 import org.eigenbase.relopt.RelOptCostFactory;
 import org.eigenbase.relopt.RelTraitDef;
 import org.eigenbase.sql.SqlNode;
 import org.eigenbase.sql.parser.SqlParseException;
-import org.eigenbase.sql2rel.StandardConvertletTable;
 
 public class DrillSqlWorker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
@@ -76,7 +67,7 @@ public class DrillSqlWorker {
     traitDefs.add(RelCollationTraitDef.INSTANCE);
     this.context = context;
     DrillOperatorTable table = new DrillOperatorTable(context.getFunctionRegistry());
-    RelOptCostFactory costFactory = (context.getPlannerSettings().useDefaultCosting()) ? 
+    RelOptCostFactory costFactory = (context.getPlannerSettings().useDefaultCosting()) ?
         null : new DrillCostBase.DrillCostFactory() ;
     StdFrameworkConfig config = StdFrameworkConfig.newBuilder() //
         .lex(Lex.MYSQL) //
@@ -109,6 +100,10 @@ public class DrillSqlWorker {
   }
 
   public PhysicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException, IOException{
+    return getPlan(null);
+  }
+
+  public PhysicalPlan getPlan(String sql, Pointer<String> textPlan) throws SqlParseException, ValidationException, RelConversionException, IOException{
     SqlNode sqlNode = planner.parse(sql);
 
     AbstractSqlHandler handler;
@@ -128,7 +123,7 @@ public class DrillSqlWorker {
       }
       // fallthrough
     default:
-      handler = new DefaultSqlHandler(planner, context);
+      handler = new DefaultSqlHandler(planner, context, textPlan);
     }
 
     return handler.getPlan(sqlNode);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 36ec0e8..e8bd837 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.SelectionVectorPrelVisitor;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.util.Pointer;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptUtil;
 import org.eigenbase.relopt.RelTraitSet;
@@ -59,11 +60,17 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
 
   protected final Planner planner;
   protected final QueryContext context;
+  private Pointer<String> textPlan;
 
   public DefaultSqlHandler(Planner planner, QueryContext context) {
+    this(planner, context, null);
+  }
+
+  public DefaultSqlHandler(Planner planner, QueryContext context, Pointer<String> textPlan) {
     super();
     this.planner = planner;
     this.context = context;
+    this.textPlan = textPlan;
   }
 
   protected void log(String name, RelNode node) {
@@ -73,8 +80,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
   }
 
   protected void log(String name, Prel node) {
+    if(textPlan != null) textPlan.value = PrelSequencer.printWithIds(node, SqlExplainLevel.ALL_ATTRIBUTES);
     if (logger.isDebugEnabled()) {
-      logger.debug(name + " : \n" + PrelSequencer.printWithIds(node, SqlExplainLevel.ALL_ATTRIBUTES));
+      logger.debug(name + " : \n" + textPlan.value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 8f533e3..a3307cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -64,7 +64,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
         .option(ChannelOption.SO_SNDBUF, 1 << 17) //
         .group(eventLoopGroup) //
         .childOption(ChannelOption.ALLOCATOR, alloc) //
-        .handler(new LoggingHandler(LogLevel.INFO)) //
+//        .handler(new LoggingHandler(LogLevel.INFO)) //
         .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
@@ -90,7 +90,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   }
 
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
-  
+
   @Override
   public boolean isClient() {
     return false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index dfc10c2..cfc3819 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -15,6 +15,7 @@ public class DrillRestServer extends ResourceConfig {
   public DrillRestServer(final WorkManager workManager) {
 //    registerClasses(HelloResource.class);
     register(JacksonFeature.class);
+    register(DrillRoot.class);
     register(FreemarkerMvcFeature.class);
     property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true);
     register(new AbstractBinder() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index dae6b43..27e0d17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -1,15 +1,60 @@
 package org.apache.drill.exec.server.rest;
 
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.ProtobufDrillSerializable.CQueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.work.WorkManager;
+import org.glassfish.jersey.server.mvc.Viewable;
+
+import com.google.hive12.hive12.common.collect.Lists;
 
 @Path("/")
 public class DrillRoot {
-    @GET
-    @Produces("text/plain")
-    public String getHello() {
-      return "hello world";
+
+  @Inject WorkManager work;
+
+  @GET
+  @Path("status")
+  @Produces("text/plain")
+  public String getHello() {
+    return "running";
+  }
+
+  @GET
+  @Path("queries")
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getQueries() {
+    DistributedMap<CQueryProfile> profiles = work.getContext().getCache().getNamedMap("sys.queries", CQueryProfile.class);
+
+    List<String> ids = Lists.newArrayList();
+    for(Map.Entry<String, CQueryProfile> entry : profiles){
+      ids.add(entry.getKey());
     }
 
+    return new Viewable("/rest/status/list.ftl", ids);
+  }
+
+  @GET
+  @Path("/query/{queryid}")
+  @Produces(MediaType.TEXT_HTML)
+  public Viewable getQuery(@PathParam("queryid") String queryId) {
+    DistributedMap<CQueryProfile> profiles = work.getContext().getCache().getNamedMap("sys.queries", CQueryProfile.class);
+    CQueryProfile c = profiles.get(queryId);
+    QueryProfile q = c == null ? QueryProfile.getDefaultInstance() : c.getObj();
+
+    return new Viewable("/rest/status/profile.ftl", q);
+
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java
index bc378a1..88478f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringArrayList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.Text;
 
 import java.util.ArrayList;
 
-public class JsonStringArrayList extends ArrayList {
+public class JsonStringArrayList<E> extends ArrayList<E> {
 
   private static ObjectMapper mapper;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
index 124252a..4b264bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/JsonStringHashMap.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
  * Simple class that extends the regular java.util.HashMap but overrides the
  * toString() method of the HashMap class to produce a JSON string instead
  */
-public class JsonStringHashMap extends HashMap {
+public class JsonStringHashMap<K, V> extends HashMap<K, V> {
 
   private static ObjectMapper mapper;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Pointer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Pointer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Pointer.java
new file mode 100644
index 0000000..8139943
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Pointer.java
@@ -0,0 +1,11 @@
+package org.apache.drill.exec.util;
+
+public class Pointer<T> {
+  public T value;
+
+  public Pointer(){}
+
+  public Pointer(T value){
+    this.value = value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index b121586..45e7ee8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -41,6 +44,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryStatus;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
@@ -50,15 +54,17 @@ import com.google.common.collect.Queues;
 
 public class WorkManager implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
-  
+
   private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps.<FragmentManager, Boolean> newConcurrentMap());
 
   private PriorityBlockingQueue<Runnable> pendingTasks = Queues.newPriorityBlockingQueue();
-  
+
   private Map<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
-  
+
   private ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
 
+  private ConcurrentMap<QueryId, QueryStatus> status = Maps.newConcurrentMap();
+
   private BootStrapContext bContext;
   private DrillbitContext dContext;
 
@@ -69,7 +75,7 @@ public class WorkManager implements Closeable{
   private final WorkEventBus workBus;
   private ExecutorService executor;
   private final EventThread eventThread;
-  
+
   public WorkManager(BootStrapContext context){
     this.bee = new WorkerBee();
     this.workBus = new WorkEventBus(bee);
@@ -79,22 +85,22 @@ public class WorkManager implements Closeable{
     this.eventThread = new EventThread();
     this.dataHandler = new DataResponseHandlerImpl(bee);
   }
-  
+
   public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord){
     this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus);
  //   executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
     eventThread.start();
   }
-  
+
   public WorkEventBus getWorkBus(){
     return workBus;
   }
-  
+
   public DataResponseHandler getDataHandler() {
     return dataHandler;
   }
-  
+
   public ControlMessageHandler getControlMessageHandler(){
     return controlMessageWorker;
   }
@@ -102,7 +108,7 @@ public class WorkManager implements Closeable{
   public UserWorker getUserWorker(){
     return userWorker;
   }
-  
+
   @Override
   public void close() throws IOException {
     try {
@@ -111,7 +117,7 @@ public class WorkManager implements Closeable{
       logger.warn("Executor interrupted while awaiting termination");
     }
   }
-  
+
 
   public DrillbitContext getContext() {
     return dContext;
@@ -124,7 +130,7 @@ public class WorkManager implements Closeable{
       logger.debug("Adding pending task {}", runner);
       pendingTasks.add(runner);
     }
-    
+
     public void addNewForeman(Foreman foreman){
       pendingTasks.add(foreman);
     }
@@ -133,20 +139,20 @@ public class WorkManager implements Closeable{
     public void addFragmentPendingRemote(FragmentManager handler){
       incomingFragments.add(handler);
     }
-    
+
     public void startFragmentPendingRemote(FragmentManager handler){
       incomingFragments.remove(handler);
       pendingTasks.add(handler.getRunnable());
     }
-    
+
     public FragmentExecutor getFragmentRunner(FragmentHandle handle){
       return runningFragments.get(handle);
     }
-    
+
     public Foreman getForemanForQueryId(QueryId queryId){
       return queries.get(queryId);
     }
-    
+
     public void retireForeman(Foreman foreman){
       queries.remove(foreman.getQueryId(), foreman);
     }
@@ -173,18 +179,18 @@ public class WorkManager implements Closeable{
       Runnable r = pendingTasks.take();
       if(r != null){
         logger.debug("Starting pending task {}", r);
-        executor.execute(r);  
+        executor.execute(r);
       }
-      
+
     }
     } catch (InterruptedException e) {
       logger.info("Work Manager stopping as it was interrupted.");
     }
   }
-   
-   
+
+
  }
 
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 3e8d3e1..eb1d738 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,23 +21,13 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
-import com.fasterxml.jackson.databind.JsonNode;
-
-import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
@@ -55,28 +45,24 @@ import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.AtomicState;
-import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.ErrorHelper;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 
 /**
@@ -99,7 +85,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     this.queryRequest = queryRequest;
     this.context = new QueryContext(connection.getSession(), queryId, dContext);
     this.initiatingClient = connection;
-    this.fragmentManager = new QueryManager(queryId, bee.getContext().getCache(), new ForemanManagerListener(), dContext.getController());
+    this.fragmentManager = new QueryManager(queryId, queryRequest, bee.getContext().getCache(), new ForemanManagerListener(), dContext.getController());
     this.bee = bee;
 
     this.state = new AtomicState<QueryState>(QueryState.PENDING) {
@@ -335,7 +321,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   private void runSQL(String sql) {
     try{
       DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
-      PhysicalPlan plan = sqlWorker.getPlan(sql);
+      Pointer<String> textPlan = new Pointer<>();
+      PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
+      fragmentManager.getStatus().setPlanText(textPlan.value);
       runPhysicalPlan(plan);
     }catch(Exception e){
       fail("Failure while parsing sql.", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 509000f..3bbe692 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -1,9 +1,10 @@
 package org.apache.drill.exec.work.foreman;
 
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 
 public class FragmentData {
   private final boolean isLocal;
@@ -13,7 +14,8 @@ public class FragmentData {
 
   public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
     super();
-    this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
+    MinorFragmentProfile f = MinorFragmentProfile.newBuilder().setState(FragmentState.SENDING).build();
+    this.status = FragmentStatus.newBuilder().setHandle(handle).setProfile(f).build();
     this.endpoint = endpoint;
     this.isLocal = isLocal;
   }
@@ -39,5 +41,16 @@ public class FragmentData {
     return endpoint;
   }
 
+  public FragmentHandle getHandle(){
+    return status.getHandle();
+  }
+
+  @Override
+  public String toString() {
+    return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate
+        + ", endpoint=" + endpoint + "]";
+  }
+
+
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 62d9375..586b221 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.Controller;
@@ -58,21 +59,32 @@ public class QueryManager implements FragmentStatusListener{
   private ForemanManagerListener foreman;
   private AtomicInteger remainingFragmentCount;
   private WorkEventBus workBus;
+  private QueryId queryId;
   private FragmentExecutor rootRunner;
-  private volatile QueryId queryId;
+  private RunQuery query;
 
-  public QueryManager(QueryId queryId, DistributedCache cache, ForemanManagerListener foreman, Controller controller) {
+  public QueryManager(QueryId id, RunQuery query, DistributedCache cache, ForemanManagerListener foreman, Controller controller) {
     super();
     this.foreman = foreman;
+    this.query = query;
+    this.queryId =  id;
     this.controller = controller;
     this.remainingFragmentCount = new AtomicInteger(0);
-    this.status = new QueryStatus(queryId, cache);
+    this.status = new QueryStatus(query, id, cache);
+  }
+
+  public QueryStatus getStatus(){
+    return status;
+  }
+
+  public void addTextPlan(String textPlan){
+
   }
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
     logger.debug("Setting up fragment runs.");
     remainingFragmentCount.set(leafFragments.size()+1);
-    queryId = rootFragment.getHandle().getQueryId();
+    assert queryId == rootFragment.getHandle().getQueryId();
     workBus = bee.getContext().getWorkBus();
 
     // set up the root fragment first so we'll have incoming buffers available.
@@ -84,7 +96,7 @@ public class QueryManager implements FragmentStatusListener{
       logger.debug("Setting buffers on root context.");
       rootContext.setBuffers(buffers);
       // add fragment to local node.
-      status.add(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      status.add(new FragmentData(rootFragment.getHandle(), null, true));
       logger.debug("Fragment added to local node.");
       rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment));
       RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
@@ -103,7 +115,7 @@ public class QueryManager implements FragmentStatusListener{
     // keep track of intermediate fragments (not root or leaf)
     for (PlanFragment f : intermediateFragments) {
       logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
-      status.add(f.getHandle(), new FragmentData(f.getHandle(), f.getAssignment(), false));
+      status.add(new FragmentData(f.getHandle(), f.getAssignment(), false));
     }
 
     // send remote (leaf) fragments.
@@ -116,7 +128,7 @@ public class QueryManager implements FragmentStatusListener{
 
   private void sendRemoteFragment(PlanFragment fragment){
     logger.debug("Sending remote fragment to node {} with data {}", fragment.getAssignment(), fragment.getFragmentJson());
-    status.add(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
+    status.add(new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
     FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
     controller.getTunnel(fragment.getAssignment()).sendFragment(listener, fragment);
   }
@@ -125,7 +137,7 @@ public class QueryManager implements FragmentStatusListener{
   @Override
   public void statusUpdate(FragmentStatus status) {
     logger.debug("New fragment status was provided to Foreman of {}", status);
-    switch(status.getState()){
+    switch(status.getProfile().getState()){
     case AWAITING_ALLOCATION:
       updateStatus(status);
       break;
@@ -142,7 +154,7 @@ public class QueryManager implements FragmentStatusListener{
       updateStatus(status);
       break;
     default:
-      throw new UnsupportedOperationException();
+      throw new UnsupportedOperationException(String.format("Received status of %s", status));
     }
   }
 
@@ -154,6 +166,7 @@ public class QueryManager implements FragmentStatusListener{
     updateStatus(status);
     int remaining = remainingFragmentCount.decrementAndGet();
     if(remaining == 0){
+      logger.info("Outcome status: {}", this.status);
       QueryResult result = QueryResult.newBuilder() //
               .setQueryState(QueryState.COMPLETED) //
               .setQueryId(queryId) //
@@ -165,7 +178,7 @@ public class QueryManager implements FragmentStatusListener{
   private void fail(FragmentStatus status){
     updateStatus(status);
     stopQuery();
-    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
+    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getProfile().getError()).build();
     foreman.cleanupAndSendResult(result);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index 64ec671..991e9ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -1,33 +1,143 @@
 package org.apache.drill.exec.work.foreman;
 
-import java.util.Map;
-
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.ProtobufDrillSerializable.CQueryProfile;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
-import com.google.common.collect.Maps;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
 
 public class QueryStatus {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
 
-  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+
+  // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+  private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> map = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
 
   private final String queryId;
+  private final QueryId id;
+  private RunQuery query;
+  private String planText;
+
+  private final DistributedMap<CQueryProfile> profileCache;
 
-  public QueryStatus(QueryId id, DistributedCache cache){
+  public QueryStatus(RunQuery query, QueryId id, DistributedCache cache){
+    this.id = id;
+    this.query = query;
     this.queryId = QueryIdHelper.getQueryId(id);
-//    cache.getMultiMap(QueryStatus.class);
+    this.profileCache = cache.getNamedMap("sys.queries", CQueryProfile.class);
+  }
+
+  public void setPlanText(String planText){
+    this.planText = planText;
+    updateCache();
 
   }
+  void add(FragmentData data){
+    int majorFragmentId = data.getHandle().getMajorFragmentId();
+    int minorFragmentId = data.getHandle().getMinorFragmentId();
+    IntObjectOpenHashMap<FragmentData> minorMap = map.get(majorFragmentId);
+    if(minorMap == null){
+      minorMap = new IntObjectOpenHashMap<FragmentData>();
+      map.put(majorFragmentId, minorMap);
+    }
 
-  void add(FragmentHandle handle, FragmentData data){
-    if(map.put(handle,  data) != null) throw new IllegalStateException();
+    minorMap.put(minorFragmentId, data);
   }
 
   void update(FragmentStatus status){
-    map.get(status.getHandle()).setStatus(status);
+    int majorFragmentId = status.getHandle().getMajorFragmentId();
+    int minorFragmentId = status.getHandle().getMinorFragmentId();
+    map.get(majorFragmentId).get(minorFragmentId).setStatus(status);
+    updateCache();
+  }
+
+  private void updateCache(){
+    profileCache.put(queryId, new CQueryProfile(getAsProfile()));
+  }
+
+  public String toString(){
+    return map.toString();
+  }
+
+  public static class FragmentId{
+    int major;
+    int minor;
+
+    public FragmentId(FragmentStatus status){
+      this.major = status.getHandle().getMajorFragmentId();
+      this.minor = status.getHandle().getMinorFragmentId();
+    }
+
+    public FragmentId(FragmentData data){
+      this.major = data.getHandle().getMajorFragmentId();
+      this.minor = data.getHandle().getMinorFragmentId();
+    }
+
+    public FragmentId(int major, int minor) {
+      super();
+      this.major = major;
+      this.minor = minor;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + major;
+      result = prime * result + minor;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      FragmentId other = (FragmentId) obj;
+      if (major != other.major)
+        return false;
+      if (minor != other.minor)
+        return false;
+      return true;
+    }
+
+    public String toString(){
+      return major + ":" + minor;
+    }
+  }
+
+  public QueryProfile getAsProfile(){
+    QueryProfile.Builder b = QueryProfile.newBuilder();
+    b.setQuery(query.getPlan());
+    b.setType(query.getType());
+    if(planText != null) b.setPlan(planText);
+    b.setId(id);
+    for(int i = 0; i < map.allocated.length; i++){
+      if(map.allocated[i]){
+        int majorFragmentId = map.keys[i];
+        IntObjectOpenHashMap<FragmentData> minorMap = (IntObjectOpenHashMap<FragmentData>) ((Object[]) map.values)[i];
+
+        MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder();
+        fb.setMajorFragmentId(majorFragmentId);
+        for(int v = 0; v < minorMap.allocated.length; v++){
+          if(minorMap.allocated[v]){
+            FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
+            fb.addMinorFragmentProfile(data.getStatus().getProfile());
+          }
+        }
+        b.addFragmentProfile(fb);
+      }
+    }
+
+    return b.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 105afc2..30e4b6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -19,31 +19,39 @@ package org.apache.drill.exec.work.fragment;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
+import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.work.ErrorHelper;
 
 public abstract class AbstractStatusReporter implements StatusReporter{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
-  
+
   private FragmentContext context;
   private volatile long startNanos;
-  
+
   public AbstractStatusReporter(FragmentContext context) {
     super();
     this.context = context;
   }
-  
+
   private  FragmentStatus.Builder getBuilder(FragmentState state){
+    return getBuilder(state, null, null);
+  }
+  private  FragmentStatus.Builder getBuilder(FragmentState state, String message, Throwable t){
     FragmentStatus.Builder status = FragmentStatus.newBuilder();
-    context.getStats().addMetricsToStatus(status);
-    status.setState(state);
-    status.setRunningTime(System.nanoTime() - startNanos);
+    MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+    context.getStats().addMetricsToStatus(b);
+    b.setState(state);
+    if(t != null){
+      b.setError(ErrorHelper.logAndConvertError(context.getIdentity(), message, t, logger));
+    }
     status.setHandle(context.getHandle());
-    status.setMemoryUse(context.getAllocator().getAllocatedMemory());
+    b.setMemoryUsed(context.getAllocator().getAllocatedMemory());
+    status.setProfile(b);
     return status;
   }
-  
+
   @Override
   public void stateChanged(FragmentHandle handle, FragmentState newState) {
     FragmentStatus.Builder status = getBuilder(newState);
@@ -70,14 +78,14 @@ public abstract class AbstractStatusReporter implements StatusReporter{
       break;
     default:
       break;
-    
+
     }
   }
-  
+
   protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
     statusChange(handle, statusBuilder.build());
   }
-  
+
   protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
     statusChange(handle, statusBuilder.build());
   }
@@ -89,13 +97,12 @@ public abstract class AbstractStatusReporter implements StatusReporter{
   protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
     statusChange(handle, statusBuilder.build());
   }
-  
+
   protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
 
   @Override
   public final void fail(FragmentHandle handle, String message, Throwable excep) {
-    FragmentStatus.Builder status = getBuilder(FragmentState.FAILED);
-    status.setError(ErrorHelper.logAndConvertError(context.getIdentity(), message, excep, logger));
+    FragmentStatus.Builder status = getBuilder(FragmentState.FAILED, message, excep);
     fail(handle, status);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 7890fc9..70f5dd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -26,8 +25,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.CancelableQuery;
@@ -47,7 +45,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   private RootExec root;
   private final FragmentContext context;
   private final StatusReporter listener;
-  
+
   public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, StatusReporter listener){
     this.context = context;
     this.rootOperator = rootOperator;
@@ -56,11 +54,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
   @Override
   public FragmentStatus getStatus() {
-    return FragmentStatus.newBuilder() //
-        .setBatchesCompleted(context.getStats().batchesCompleted.get()) //
-        .setDataProcessed(context.getStats().dataProcessed.get()) //
-        .setMemoryUse(context.getAllocator().getAllocatedMemory()) //
-        .build();
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -71,7 +65,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   public UserClientConnection getClient(){
     return context.getConnection();
   }
-  
+
   @Override
   public void run() {
     final String originalThread = Thread.currentThread().getName();
@@ -81,7 +75,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
         context.getHandle().getMinorFragmentId()
         );
     Thread.currentThread().setName(newThreadName);
-    
+
     boolean closed = false;
     try {
       root = ImplCreator.getExec(context, rootOperator);
@@ -95,33 +89,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
       return;
     }
-    
-    Timer.Context t = context.getStats().fragmentTime.time();
-    
+
+
+
     // run the query until root.next returns false.
     try{
       while(state.get() == FragmentState.RUNNING_VALUE){
         if(!root.next()){
           if(context.isFailed()){
-            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);  
+            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
           }else{
             updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
           }
 
         }
       }
-      
+
       root.stop();
-      
+
       closed = true;
-      
+
       context.close();
     }catch(Exception ex){
       logger.debug("Caught exception while running fragment", ex);
       internalFail(ex);
     }finally{
       Thread.currentThread().setName(originalThread);
-      t.stop();
       if(!closed) try{
         context.close();
       }catch(RuntimeException e){
@@ -130,17 +123,17 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     }
     logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
   }
-  
+
   private void internalFail(Throwable excep){
     state.set(FragmentState.FAILED_VALUE);
     listener.fail(context.getHandle(), "Failure while running fragment.", excep);
   }
-  
+
   private void updateState(FragmentState update){
     state.set(update.getNumber());
     listener.stateChanged(context.getHandle(), update);
   }
-  
+
   private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
     boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
     if (!success && exceptionOnFailure) {
@@ -161,5 +154,5 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   public FragmentContext getContext(){
     return context;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index c7c3439..84fb806 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
  */
 public class NonRootFragmentManager implements FragmentManager {
   private final PlanFragment fragment;
-  private FragmentLeaf root;
+  private FragmentRoot root;
   private final IncomingBuffers buffers;
   private final StatusReporter runnerListener;
   private volatile FragmentExecutor runner;
@@ -50,7 +50,7 @@ public class NonRootFragmentManager implements FragmentManager {
   private final FragmentContext context;
   private final PhysicalPlanReader reader;
   private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
-  
+
   public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{
     try{
       this.fragment = fragment;
@@ -82,14 +82,7 @@ public class NonRootFragmentManager implements FragmentManager {
     synchronized(this){
       if(runner != null) throw new IllegalStateException("Get Runnable can only be run once.");
       if(cancel) return null;
-      FragmentRoot fragRoot = null;
-      try {
-        fragRoot = reader.readFragmentOperator(fragment.getFragmentJson());
-      } catch (IOException e) {
-        runnerListener.fail(fragment.getHandle(), "Failure while setting up remote fragment.", e);
-        return null;
-      }
-      runner = new FragmentExecutor(context, fragRoot, runnerListener);
+      runner = new FragmentExecutor(context, root, runnerListener);
       return this.runner;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
index 45c1f5e..26b5d68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 
 /**
  * The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/resources/rest/status/list.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/status/list.ftl b/exec/java-exec/src/main/resources/rest/status/list.ftl
new file mode 100644
index 0000000..6bde590
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/status/list.ftl
@@ -0,0 +1,9 @@
+<html>
+Welcome to Drill! 
+<br />
+<#list model as query>
+
+<a href="/query/${query}">${query}</a><br />
+
+</#list>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb90852a/exec/java-exec/src/main/resources/rest/status/profile.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/status/profile.ftl b/exec/java-exec/src/main/resources/rest/status/profile.ftl
new file mode 100644
index 0000000..94ff6ef
--- /dev/null
+++ b/exec/java-exec/src/main/resources/rest/status/profile.ftl
@@ -0,0 +1,19 @@
+<html>
+<a href="/queries">back</a><br />
+
+<pre>
+${model.query}
+</pre> 
+<br /><br />
+<pre>
+${model.plan}
+</pre>
+
+
+<pre>
+${model.toString()}
+</pre>
+
+
+
+<html>
\ No newline at end of file


Mime
View raw message